hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47///   ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51    K,
52    V,
53    Loc,
54    Bound: Boundedness,
55    Order: Ordering = TotalOrder,
56    Retry: Retries = ExactlyOnce,
57> {
58    pub(crate) location: Loc,
59    pub(crate) ir_node: RefCell<HydroNode>,
60
61    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65    for KeyedStream<K, V, L, B, NoOrder, R>
66where
67    L: Location<'a>,
68{
69    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70        KeyedStream {
71            location: stream.location,
72            ir_node: stream.ir_node,
73            _phantom: PhantomData,
74        }
75    }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80    L: Location<'a>,
81{
82    fn defer_tick(self) -> Self {
83        KeyedStream::defer_tick(self)
84    }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90    L: Location<'a>,
91{
92    type Location = Tick<L>;
93
94    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95        KeyedStream {
96            location: location.clone(),
97            ir_node: RefCell::new(HydroNode::CycleSource {
98                ident,
99                metadata: location.new_node_metadata(
100                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101                ),
102            }),
103            _phantom: PhantomData,
104        }
105    }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111    L: Location<'a>,
112{
113    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114        assert_eq!(
115            Location::id(&self.location),
116            expected_location,
117            "locations do not match"
118        );
119
120        self.location
121            .flow_state()
122            .borrow_mut()
123            .push_root(HydroRoot::CycleSink {
124                ident,
125                input: Box::new(self.ir_node.into_inner()),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132    for KeyedStream<K, V, L, B, O, R>
133where
134    L: Location<'a> + NoTick,
135{
136    type Location = L;
137
138    fn create_source(ident: syn::Ident, location: L) -> Self {
139        KeyedStream {
140            location: location.clone(),
141            ir_node: RefCell::new(HydroNode::CycleSource {
142                ident,
143                metadata: location
144                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152    for KeyedStream<K, V, L, B, O, R>
153where
154    L: Location<'a> + NoTick,
155{
156    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(self.ir_node.into_inner()),
168                op_metadata: HydroIrOpMetadata::new(),
169            });
170    }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176    fn clone(&self) -> Self {
177        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179            *self.ir_node.borrow_mut() = HydroNode::Tee {
180                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181                metadata: self.location.new_node_metadata(Self::collection_kind()),
182            };
183        }
184
185        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186            KeyedStream {
187                location: self.location.clone(),
188                ir_node: HydroNode::Tee {
189                    inner: TeeNode(inner.0.clone()),
190                    metadata: metadata.clone(),
191                }
192                .into(),
193                _phantom: PhantomData,
194            }
195        } else {
196            unreachable!()
197        }
198    }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202    KeyedStream<K, V, L, B, O, R>
203{
204    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208        KeyedStream {
209            location,
210            ir_node: RefCell::new(ir_node),
211            _phantom: PhantomData,
212        }
213    }
214
215    /// Returns the [`CollectionKind`] corresponding to this type.
216    pub fn collection_kind() -> CollectionKind {
217        CollectionKind::KeyedStream {
218            bound: B::BOUND_KIND,
219            value_order: O::ORDERING_KIND,
220            value_retry: R::RETRIES_KIND,
221            key_type: stageleft::quote_type::<K>().into(),
222            value_type: stageleft::quote_type::<V>().into(),
223        }
224    }
225
226    /// Returns the [`Location`] where this keyed stream is being materialized.
227    pub fn location(&self) -> &L {
228        &self.location
229    }
230
231    /// Explicitly "casts" the keyed stream to a type with a different ordering
232    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233    /// by the type-system.
234    ///
235    /// # Non-Determinism
236    /// This function is used as an escape hatch, and any mistakes in the
237    /// provided ordering guarantee will propagate into the guarantees
238    /// for the rest of the program.
239    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240        if O::ORDERING_KIND == O2::ORDERING_KIND {
241            KeyedStream::new(self.location, self.ir_node.into_inner())
242        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243            // We can always weaken the ordering guarantee
244            KeyedStream::new(
245                self.location.clone(),
246                HydroNode::Cast {
247                    inner: Box::new(self.ir_node.into_inner()),
248                    metadata: self
249                        .location
250                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251                },
252            )
253        } else {
254            KeyedStream::new(
255                self.location.clone(),
256                HydroNode::ObserveNonDet {
257                    inner: Box::new(self.ir_node.into_inner()),
258                    trusted: false,
259                    metadata: self
260                        .location
261                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
262                },
263            )
264        }
265    }
266
267    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
268    /// which is always safe because that is the weakest possible guarantee.
269    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
270        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
271        self.assume_ordering::<NoOrder>(nondet)
272    }
273
274    /// Explicitly "casts" the keyed stream to a type with a different retries
275    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
276    /// be proven by the type-system.
277    ///
278    /// # Non-Determinism
279    /// This function is used as an escape hatch, and any mistakes in the
280    /// provided retries guarantee will propagate into the guarantees
281    /// for the rest of the program.
282    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
283        if R::RETRIES_KIND == R2::RETRIES_KIND {
284            KeyedStream::new(self.location, self.ir_node.into_inner())
285        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
286            // We can always weaken the retries guarantee
287            KeyedStream::new(
288                self.location.clone(),
289                HydroNode::Cast {
290                    inner: Box::new(self.ir_node.into_inner()),
291                    metadata: self
292                        .location
293                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
294                },
295            )
296        } else {
297            KeyedStream::new(
298                self.location.clone(),
299                HydroNode::ObserveNonDet {
300                    inner: Box::new(self.ir_node.into_inner()),
301                    trusted: false,
302                    metadata: self
303                        .location
304                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
305                },
306            )
307        }
308    }
309
310    /// Flattens the keyed stream into an unordered stream of key-value pairs.
311    ///
312    /// # Example
313    /// ```rust
314    /// # use hydro_lang::prelude::*;
315    /// # use futures::StreamExt;
316    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
317    /// process
318    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
319    ///     .into_keyed()
320    ///     .entries()
321    /// # }, |mut stream| async move {
322    /// // (1, 2), (1, 3), (2, 4) in any order
323    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
324    /// #     assert_eq!(stream.next().await.unwrap(), w);
325    /// # }
326    /// # }));
327    /// ```
328    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
329        Stream::new(
330            self.location.clone(),
331            HydroNode::Cast {
332                inner: Box::new(self.ir_node.into_inner()),
333                metadata: self
334                    .location
335                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
336            },
337        )
338    }
339
340    /// Flattens the keyed stream into an unordered stream of only the values.
341    ///
342    /// # Example
343    /// ```rust
344    /// # use hydro_lang::prelude::*;
345    /// # use futures::StreamExt;
346    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
347    /// process
348    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
349    ///     .into_keyed()
350    ///     .values()
351    /// # }, |mut stream| async move {
352    /// // 2, 3, 4 in any order
353    /// # for w in vec![2, 3, 4] {
354    /// #     assert_eq!(stream.next().await.unwrap(), w);
355    /// # }
356    /// # }));
357    /// ```
358    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
359        self.entries().map(q!(|(_, v)| v))
360    }
361
362    /// Transforms each value by invoking `f` on each element, with keys staying the same
363    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
364    ///
365    /// If you do not want to modify the stream and instead only want to view
366    /// each item use [`KeyedStream::inspect`] instead.
367    ///
368    /// # Example
369    /// ```rust
370    /// # use hydro_lang::prelude::*;
371    /// # use futures::StreamExt;
372    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
373    /// process
374    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
375    ///     .into_keyed()
376    ///     .map(q!(|v| v + 1))
377    /// #   .entries()
378    /// # }, |mut stream| async move {
379    /// // { 1: [3, 4], 2: [5] }
380    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
381    /// #     assert_eq!(stream.next().await.unwrap(), w);
382    /// # }
383    /// # }));
384    /// ```
385    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
386    where
387        F: Fn(V) -> U + 'a,
388    {
389        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
390        let map_f = q!({
391            let orig = f;
392            move |(k, v)| (k, orig(v))
393        })
394        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
395        .into();
396
397        KeyedStream::new(
398            self.location.clone(),
399            HydroNode::Map {
400                f: map_f,
401                input: Box::new(self.ir_node.into_inner()),
402                metadata: self
403                    .location
404                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
405            },
406        )
407    }
408
409    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
410    /// re-grouped even they are tuples; instead they will be grouped under the original key.
411    ///
412    /// If you do not want to modify the stream and instead only want to view
413    /// each item use [`KeyedStream::inspect_with_key`] instead.
414    ///
415    /// # Example
416    /// ```rust
417    /// # use hydro_lang::prelude::*;
418    /// # use futures::StreamExt;
419    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
420    /// process
421    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
422    ///     .into_keyed()
423    ///     .map_with_key(q!(|(k, v)| k + v))
424    /// #   .entries()
425    /// # }, |mut stream| async move {
426    /// // { 1: [3, 4], 2: [6] }
427    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
428    /// #     assert_eq!(stream.next().await.unwrap(), w);
429    /// # }
430    /// # }));
431    /// ```
432    pub fn map_with_key<U, F>(
433        self,
434        f: impl IntoQuotedMut<'a, F, L> + Copy,
435    ) -> KeyedStream<K, U, L, B, O, R>
436    where
437        F: Fn((K, V)) -> U + 'a,
438        K: Clone,
439    {
440        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
441        let map_f = q!({
442            let orig = f;
443            move |(k, v)| {
444                let out = orig((Clone::clone(&k), v));
445                (k, out)
446            }
447        })
448        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
449        .into();
450
451        KeyedStream::new(
452            self.location.clone(),
453            HydroNode::Map {
454                f: map_f,
455                input: Box::new(self.ir_node.into_inner()),
456                metadata: self
457                    .location
458                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
459            },
460        )
461    }
462
463    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
464    /// `f`, preserving the order of the elements within the group.
465    ///
466    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
467    /// not modify or take ownership of the values. If you need to modify the values while filtering
468    /// use [`KeyedStream::filter_map`] instead.
469    ///
470    /// # Example
471    /// ```rust
472    /// # use hydro_lang::prelude::*;
473    /// # use futures::StreamExt;
474    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
475    /// process
476    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
477    ///     .into_keyed()
478    ///     .filter(q!(|&x| x > 2))
479    /// #   .entries()
480    /// # }, |mut stream| async move {
481    /// // { 1: [3], 2: [4] }
482    /// # for w in vec![(1, 3), (2, 4)] {
483    /// #     assert_eq!(stream.next().await.unwrap(), w);
484    /// # }
485    /// # }));
486    /// ```
487    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
488    where
489        F: Fn(&V) -> bool + 'a,
490    {
491        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
492        let filter_f = q!({
493            let orig = f;
494            move |t: &(_, _)| orig(&t.1)
495        })
496        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
497        .into();
498
499        KeyedStream::new(
500            self.location.clone(),
501            HydroNode::Filter {
502                f: filter_f,
503                input: Box::new(self.ir_node.into_inner()),
504                metadata: self.location.new_node_metadata(Self::collection_kind()),
505            },
506        )
507    }
508
509    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
510    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
511    ///
512    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
513    /// not modify or take ownership of the values. If you need to modify the values while filtering
514    /// use [`KeyedStream::filter_map_with_key`] instead.
515    ///
516    /// # Example
517    /// ```rust
518    /// # use hydro_lang::prelude::*;
519    /// # use futures::StreamExt;
520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
521    /// process
522    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
523    ///     .into_keyed()
524    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
525    /// #   .entries()
526    /// # }, |mut stream| async move {
527    /// // { 1: [3], 2: [4] }
528    /// # for w in vec![(1, 3), (2, 4)] {
529    /// #     assert_eq!(stream.next().await.unwrap(), w);
530    /// # }
531    /// # }));
532    /// ```
533    pub fn filter_with_key<F>(
534        self,
535        f: impl IntoQuotedMut<'a, F, L> + Copy,
536    ) -> KeyedStream<K, V, L, B, O, R>
537    where
538        F: Fn(&(K, V)) -> bool + 'a,
539    {
540        let filter_f = f
541            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
542            .into();
543
544        KeyedStream::new(
545            self.location.clone(),
546            HydroNode::Filter {
547                f: filter_f,
548                input: Box::new(self.ir_node.into_inner()),
549                metadata: self.location.new_node_metadata(Self::collection_kind()),
550            },
551        )
552    }
553
554    /// An operator that both filters and maps each value, with keys staying the same.
555    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
556    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
557    ///
558    /// # Example
559    /// ```rust
560    /// # use hydro_lang::prelude::*;
561    /// # use futures::StreamExt;
562    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
563    /// process
564    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
565    ///     .into_keyed()
566    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
567    /// #   .entries()
568    /// # }, |mut stream| async move {
569    /// // { 1: [2], 2: [4] }
570    /// # for w in vec![(1, 2), (2, 4)] {
571    /// #     assert_eq!(stream.next().await.unwrap(), w);
572    /// # }
573    /// # }));
574    /// ```
575    pub fn filter_map<U, F>(
576        self,
577        f: impl IntoQuotedMut<'a, F, L> + Copy,
578    ) -> KeyedStream<K, U, L, B, O, R>
579    where
580        F: Fn(V) -> Option<U> + 'a,
581    {
582        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
583        let filter_map_f = q!({
584            let orig = f;
585            move |(k, v)| orig(v).map(|o| (k, o))
586        })
587        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
588        .into();
589
590        KeyedStream::new(
591            self.location.clone(),
592            HydroNode::FilterMap {
593                f: filter_map_f,
594                input: Box::new(self.ir_node.into_inner()),
595                metadata: self
596                    .location
597                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
598            },
599        )
600    }
601
602    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
603    /// re-grouped even they are tuples; instead they will be grouped under the original key.
604    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
605    ///
606    /// # Example
607    /// ```rust
608    /// # use hydro_lang::prelude::*;
609    /// # use futures::StreamExt;
610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611    /// process
612    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
613    ///     .into_keyed()
614    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
615    /// #   .entries()
616    /// # }, |mut stream| async move {
617    /// // { 2: [2] }
618    /// # for w in vec![(2, 2)] {
619    /// #     assert_eq!(stream.next().await.unwrap(), w);
620    /// # }
621    /// # }));
622    /// ```
623    pub fn filter_map_with_key<U, F>(
624        self,
625        f: impl IntoQuotedMut<'a, F, L> + Copy,
626    ) -> KeyedStream<K, U, L, B, O, R>
627    where
628        F: Fn((K, V)) -> Option<U> + 'a,
629        K: Clone,
630    {
631        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
632        let filter_map_f = q!({
633            let orig = f;
634            move |(k, v)| {
635                let out = orig((Clone::clone(&k), v));
636                out.map(|o| (k, o))
637            }
638        })
639        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
640        .into();
641
642        KeyedStream::new(
643            self.location.clone(),
644            HydroNode::FilterMap {
645                f: filter_map_f,
646                input: Box::new(self.ir_node.into_inner()),
647                metadata: self
648                    .location
649                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
650            },
651        )
652    }
653
654    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
655    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
656    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
657    ///
658    /// # Example
659    /// ```rust
660    /// # use hydro_lang::prelude::*;
661    /// # use futures::StreamExt;
662    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663    /// let tick = process.tick();
664    /// let batch = process
665    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
666    ///   .into_keyed()
667    ///   .batch(&tick, nondet!(/** test */));
668    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
669    /// batch.cross_singleton(count).all_ticks().entries()
670    /// # }, |mut stream| async move {
671    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
672    /// # for w in vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))] {
673    /// #     assert_eq!(stream.next().await.unwrap(), w);
674    /// # }
675    /// # }));
676    /// ```
677    pub fn cross_singleton<O2>(
678        self,
679        other: impl Into<Optional<O2, L, Bounded>>,
680    ) -> KeyedStream<K, (V, O2), L, B, O, R>
681    where
682        O2: Clone,
683    {
684        let other: Optional<O2, L, Bounded> = other.into();
685        check_matching_location(&self.location, &other.location);
686
687        Stream::new(
688            self.location.clone(),
689            HydroNode::CrossSingleton {
690                left: Box::new(self.ir_node.into_inner()),
691                right: Box::new(other.ir_node.into_inner()),
692                metadata: self
693                    .location
694                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
695            },
696        )
697        .map(q!(|((k, v), o2)| (k, (v, o2))))
698        .into_keyed()
699    }
700
701    /// For each value `v` in each group, transform `v` using `f` and then treat the
702    /// result as an [`Iterator`] to produce values one by one within the same group.
703    /// The implementation for [`Iterator`] for the output type `I` must produce items
704    /// in a **deterministic** order.
705    ///
706    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
707    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
708    ///
709    /// # Example
710    /// ```rust
711    /// # use hydro_lang::prelude::*;
712    /// # use futures::StreamExt;
713    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714    /// process
715    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
716    ///     .into_keyed()
717    ///     .flat_map_ordered(q!(|x| x))
718    /// #   .entries()
719    /// # }, |mut stream| async move {
720    /// // { 1: [2, 3, 4], 2: [5, 6] }
721    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
722    /// #     assert_eq!(stream.next().await.unwrap(), w);
723    /// # }
724    /// # }));
725    /// ```
726    pub fn flat_map_ordered<U, I, F>(
727        self,
728        f: impl IntoQuotedMut<'a, F, L> + Copy,
729    ) -> KeyedStream<K, U, L, B, O, R>
730    where
731        I: IntoIterator<Item = U>,
732        F: Fn(V) -> I + 'a,
733        K: Clone,
734    {
735        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
736        let flat_map_f = q!({
737            let orig = f;
738            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
739        })
740        .splice_fn1_ctx::<(K, V), _>(&self.location)
741        .into();
742
743        KeyedStream::new(
744            self.location.clone(),
745            HydroNode::FlatMap {
746                f: flat_map_f,
747                input: Box::new(self.ir_node.into_inner()),
748                metadata: self
749                    .location
750                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
751            },
752        )
753    }
754
755    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
756    /// for the output type `I` to produce items in any order.
757    ///
758    /// # Example
759    /// ```rust
760    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
761    /// # use futures::StreamExt;
762    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
763    /// process
764    ///     .source_iter(q!(vec![
765    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
766    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
767    ///     ]))
768    ///     .into_keyed()
769    ///     .flat_map_unordered(q!(|x| x))
770    /// #   .entries()
771    /// # }, |mut stream| async move {
772    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
773    /// # let mut results = Vec::new();
774    /// # for _ in 0..4 {
775    /// #     results.push(stream.next().await.unwrap());
776    /// # }
777    /// # results.sort();
778    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
779    /// # }));
780    /// ```
781    pub fn flat_map_unordered<U, I, F>(
782        self,
783        f: impl IntoQuotedMut<'a, F, L> + Copy,
784    ) -> KeyedStream<K, U, L, B, NoOrder, R>
785    where
786        I: IntoIterator<Item = U>,
787        F: Fn(V) -> I + 'a,
788        K: Clone,
789    {
790        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
791        let flat_map_f = q!({
792            let orig = f;
793            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
794        })
795        .splice_fn1_ctx::<(K, V), _>(&self.location)
796        .into();
797
798        KeyedStream::new(
799            self.location.clone(),
800            HydroNode::FlatMap {
801                f: flat_map_f,
802                input: Box::new(self.ir_node.into_inner()),
803                metadata: self
804                    .location
805                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
806            },
807        )
808    }
809
810    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
811    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
812    /// items in a **deterministic** order.
813    ///
814    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
815    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
816    ///
817    /// # Example
818    /// ```rust
819    /// # use hydro_lang::prelude::*;
820    /// # use futures::StreamExt;
821    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
822    /// process
823    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
824    ///     .into_keyed()
825    ///     .flatten_ordered()
826    /// #   .entries()
827    /// # }, |mut stream| async move {
828    /// // { 1: [2, 3, 4], 2: [5, 6] }
829    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
830    /// #     assert_eq!(stream.next().await.unwrap(), w);
831    /// # }
832    /// # }));
833    /// ```
834    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
835    where
836        V: IntoIterator<Item = U>,
837        K: Clone,
838    {
839        self.flat_map_ordered(q!(|d| d))
840    }
841
842    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
843    /// for the value type `V` to produce items in any order.
844    ///
845    /// # Example
846    /// ```rust
847    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
848    /// # use futures::StreamExt;
849    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
850    /// process
851    ///     .source_iter(q!(vec![
852    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
853    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
854    ///     ]))
855    ///     .into_keyed()
856    ///     .flatten_unordered()
857    /// #   .entries()
858    /// # }, |mut stream| async move {
859    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
860    /// # let mut results = Vec::new();
861    /// # for _ in 0..4 {
862    /// #     results.push(stream.next().await.unwrap());
863    /// # }
864    /// # results.sort();
865    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
866    /// # }));
867    /// ```
868    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
869    where
870        V: IntoIterator<Item = U>,
871        K: Clone,
872    {
873        self.flat_map_unordered(q!(|d| d))
874    }
875
876    /// An operator which allows you to "inspect" each element of a stream without
877    /// modifying it. The closure `f` is called on a reference to each value. This is
878    /// mainly useful for debugging, and should not be used to generate side-effects.
879    ///
880    /// # Example
881    /// ```rust
882    /// # use hydro_lang::prelude::*;
883    /// # use futures::StreamExt;
884    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
885    /// process
886    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
887    ///     .into_keyed()
888    ///     .inspect(q!(|v| println!("{}", v)))
889    /// #   .entries()
890    /// # }, |mut stream| async move {
891    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
892    /// #     assert_eq!(stream.next().await.unwrap(), w);
893    /// # }
894    /// # }));
895    /// ```
896    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
897    where
898        F: Fn(&V) + 'a,
899    {
900        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
901        let inspect_f = q!({
902            let orig = f;
903            move |t: &(_, _)| orig(&t.1)
904        })
905        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
906        .into();
907
908        KeyedStream::new(
909            self.location.clone(),
910            HydroNode::Inspect {
911                f: inspect_f,
912                input: Box::new(self.ir_node.into_inner()),
913                metadata: self.location.new_node_metadata(Self::collection_kind()),
914            },
915        )
916    }
917
918    /// An operator which allows you to "inspect" each element of a stream without
919    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
920    /// mainly useful for debugging, and should not be used to generate side-effects.
921    ///
922    /// # Example
923    /// ```rust
924    /// # use hydro_lang::prelude::*;
925    /// # use futures::StreamExt;
926    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
927    /// process
928    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
929    ///     .into_keyed()
930    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
931    /// #   .entries()
932    /// # }, |mut stream| async move {
933    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
934    /// #     assert_eq!(stream.next().await.unwrap(), w);
935    /// # }
936    /// # }));
937    /// ```
938    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
939    where
940        F: Fn(&(K, V)) + 'a,
941    {
942        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
943
944        KeyedStream::new(
945            self.location.clone(),
946            HydroNode::Inspect {
947                f: inspect_f,
948                input: Box::new(self.ir_node.into_inner()),
949                metadata: self.location.new_node_metadata(Self::collection_kind()),
950            },
951        )
952    }
953
954    /// An operator which allows you to "name" a `HydroNode`.
955    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
956    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
957        {
958            let mut node = self.ir_node.borrow_mut();
959            let metadata = node.metadata_mut();
960            metadata.tag = Some(name.to_string());
961        }
962        self
963    }
964}
965
966impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
967    KeyedStream<K, V, L, Unbounded, O, R>
968{
969    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
970    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
971    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
972    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
973    ///
974    /// Currently, both input streams must be [`Unbounded`].
975    ///
976    /// # Example
977    /// ```rust
978    /// # use hydro_lang::prelude::*;
979    /// # use futures::StreamExt;
980    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
981    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
982    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
983    /// numbers1.interleave(numbers2)
984    /// #   .entries()
985    /// # }, |mut stream| async move {
986    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
987    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
988    /// #     assert_eq!(stream.next().await.unwrap(), w);
989    /// # }
990    /// # }));
991    /// ```
992    pub fn interleave<O2: Ordering, R2: Retries>(
993        self,
994        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
995    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
996    where
997        R: MinRetries<R2>,
998    {
999        let tick = self.location.tick();
1000        // Because the outputs are unordered, we can interleave batches from both streams.
1001        let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1002        self.batch(&tick, nondet_batch_interleaving)
1003            .weakest_ordering()
1004            .chain(
1005                other
1006                    .batch(&tick, nondet_batch_interleaving)
1007                    .weakest_ordering(),
1008            )
1009            .all_ticks()
1010    }
1011}
1012
1013/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1014/// control the processing of future elements.
1015pub enum Generate<T> {
1016    /// Emit the provided element, and keep processing future inputs.
1017    Yield(T),
1018    /// Emit the provided element as the _final_ element, do not process future inputs.
1019    Return(T),
1020    /// Do not emit anything, but continue processing future inputs.
1021    Continue,
1022    /// Do not emit anything, and do not process further inputs.
1023    Break,
1024}
1025
1026impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1027where
1028    K: Eq + Hash,
1029    L: Location<'a>,
1030{
1031    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1032    ///
1033    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1034    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1035    /// early by returning `None`.
1036    ///
1037    /// The function takes a mutable reference to the accumulator and the current element, and returns
1038    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1039    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1040    ///
1041    /// # Example
1042    /// ```rust
1043    /// # use hydro_lang::prelude::*;
1044    /// # use futures::StreamExt;
1045    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046    /// process
1047    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1048    ///     .into_keyed()
1049    ///     .scan(
1050    ///         q!(|| 0),
1051    ///         q!(|acc, x| {
1052    ///             *acc += x;
1053    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1054    ///         }),
1055    ///     )
1056    /// #   .entries()
1057    /// # }, |mut stream| async move {
1058    /// // Output: { 0: [1], 1: [3, 7] }
1059    /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1060    /// #     assert_eq!(stream.next().await.unwrap(), w);
1061    /// # }
1062    /// # }));
1063    /// ```
1064    pub fn scan<A, U, I, F>(
1065        self,
1066        init: impl IntoQuotedMut<'a, I, L> + Copy,
1067        f: impl IntoQuotedMut<'a, F, L> + Copy,
1068    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1069    where
1070        K: Clone,
1071        I: Fn() -> A + 'a,
1072        F: Fn(&mut A, V) -> Option<U> + 'a,
1073    {
1074        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1075        self.generator(
1076            init,
1077            q!({
1078                let orig = f;
1079                move |state, v| {
1080                    if let Some(out) = orig(state, v) {
1081                        Generate::Yield(out)
1082                    } else {
1083                        Generate::Break
1084                    }
1085                }
1086            }),
1087        )
1088    }
1089
1090    /// Iteratively processes the elements in each group using a state machine that can yield
1091    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1092    /// syntax in Rust, without requiring special syntax.
1093    ///
1094    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1095    /// state for each group. The second argument defines the processing logic, taking in a
1096    /// mutable reference to the group's state and the value to be processed. It emits a
1097    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1098    /// should be processed.
1099    ///
1100    /// # Example
1101    /// ```rust
1102    /// # use hydro_lang::prelude::*;
1103    /// # use futures::StreamExt;
1104    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1105    /// process
1106    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1107    ///     .into_keyed()
1108    ///     .generator(
1109    ///         q!(|| 0),
1110    ///         q!(|acc, x| {
1111    ///             *acc += x;
1112    ///             if *acc > 100 {
1113    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1114    ///                     "done!".to_string()
1115    ///                 )
1116    ///             } else if *acc % 2 == 0 {
1117    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1118    ///                     "even".to_string()
1119    ///                 )
1120    ///             } else {
1121    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1122    ///             }
1123    ///         }),
1124    ///     )
1125    /// #   .entries()
1126    /// # }, |mut stream| async move {
1127    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1128    /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1129    /// #     assert_eq!(stream.next().await.unwrap(), w);
1130    /// # }
1131    /// # }));
1132    /// ```
1133    pub fn generator<A, U, I, F>(
1134        self,
1135        init: impl IntoQuotedMut<'a, I, L> + Copy,
1136        f: impl IntoQuotedMut<'a, F, L> + Copy,
1137    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1138    where
1139        K: Clone,
1140        I: Fn() -> A + 'a,
1141        F: Fn(&mut A, V) -> Generate<U> + 'a,
1142    {
1143        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1144        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1145
1146        let scan_init = q!(|| HashMap::new())
1147            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1148            .into();
1149        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1150            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1151            if let Some(existing_state_value) = existing_state {
1152                match f(existing_state_value, v) {
1153                    Generate::Yield(out) => Some(Some((k, out))),
1154                    Generate::Return(out) => {
1155                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1156                        Some(Some((k, out)))
1157                    }
1158                    Generate::Break => {
1159                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1160                        Some(None)
1161                    }
1162                    Generate::Continue => Some(None),
1163                }
1164            } else {
1165                Some(None)
1166            }
1167        })
1168        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1169        .into();
1170
1171        let scan_node = HydroNode::Scan {
1172            init: scan_init,
1173            acc: scan_f,
1174            input: Box::new(self.ir_node.into_inner()),
1175            metadata: self.location.new_node_metadata(Stream::<
1176                Option<(K, U)>,
1177                L,
1178                B,
1179                TotalOrder,
1180                ExactlyOnce,
1181            >::collection_kind()),
1182        };
1183
1184        let flatten_f = q!(|d| d)
1185            .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1186            .into();
1187        let flatten_node = HydroNode::FlatMap {
1188            f: flatten_f,
1189            input: Box::new(scan_node),
1190            metadata: self.location.new_node_metadata(KeyedStream::<
1191                K,
1192                U,
1193                L,
1194                B,
1195                TotalOrder,
1196                ExactlyOnce,
1197            >::collection_kind()),
1198        };
1199
1200        KeyedStream::new(self.location.clone(), flatten_node)
1201    }
1202
1203    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1204    /// in-order across the values in each group. But the aggregation function returns a boolean,
1205    /// which when true indicates that the aggregated result is complete and can be released to
1206    /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1207    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1208    /// normal stream elements.
1209    ///
1210    /// # Example
1211    /// ```rust
1212    /// # use hydro_lang::prelude::*;
1213    /// # use futures::StreamExt;
1214    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1215    /// process
1216    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1217    ///     .into_keyed()
1218    ///     .fold_early_stop(
1219    ///         q!(|| 0),
1220    ///         q!(|acc, x| {
1221    ///             *acc += x;
1222    ///             x % 2 == 0
1223    ///         }),
1224    ///     )
1225    /// #   .entries()
1226    /// # }, |mut stream| async move {
1227    /// // Output: { 0: 2, 1: 9 }
1228    /// # for w in vec![(0, 2), (1, 9)] {
1229    /// #     assert_eq!(stream.next().await.unwrap(), w);
1230    /// # }
1231    /// # }));
1232    /// ```
1233    pub fn fold_early_stop<A, I, F>(
1234        self,
1235        init: impl IntoQuotedMut<'a, I, L> + Copy,
1236        f: impl IntoQuotedMut<'a, F, L> + Copy,
1237    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1238    where
1239        K: Clone,
1240        I: Fn() -> A + 'a,
1241        F: Fn(&mut A, V) -> bool + 'a,
1242    {
1243        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1244        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1245        let out_without_bound_cast = self.generator(
1246            q!(move || Some(init())),
1247            q!(move |key_state, v| {
1248                if let Some(key_state_value) = key_state.as_mut() {
1249                    if f(key_state_value, v) {
1250                        Generate::Return(key_state.take().unwrap())
1251                    } else {
1252                        Generate::Continue
1253                    }
1254                } else {
1255                    unreachable!()
1256                }
1257            }),
1258        );
1259
1260        KeyedSingleton::new(
1261            out_without_bound_cast.location.clone(),
1262            HydroNode::Cast {
1263                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1264                metadata: out_without_bound_cast
1265                    .location
1266                    .new_node_metadata(
1267                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1268                    ),
1269            },
1270        )
1271    }
1272
1273    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1274    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1275    /// otherwise the first element would be non-deterministic.
1276    ///
1277    /// # Example
1278    /// ```rust
1279    /// # use hydro_lang::prelude::*;
1280    /// # use futures::StreamExt;
1281    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1282    /// process
1283    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1284    ///     .into_keyed()
1285    ///     .first()
1286    /// #   .entries()
1287    /// # }, |mut stream| async move {
1288    /// // Output: { 0: 2, 1: 3 }
1289    /// # for w in vec![(0, 2), (1, 3)] {
1290    /// #     assert_eq!(stream.next().await.unwrap(), w);
1291    /// # }
1292    /// # }));
1293    /// ```
1294    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1295    where
1296        K: Clone,
1297    {
1298        self.fold_early_stop(
1299            q!(|| None),
1300            q!(|acc, v| {
1301                *acc = Some(v);
1302                true
1303            }),
1304        )
1305        .map(q!(|v| v.unwrap()))
1306    }
1307
1308    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1309    ///
1310    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1311    /// to depend on the order of elements in the group.
1312    ///
1313    /// If the input and output value types are the same and do not require initialization then use
1314    /// [`KeyedStream::reduce`].
1315    ///
1316    /// # Example
1317    /// ```rust
1318    /// # use hydro_lang::prelude::*;
1319    /// # use futures::StreamExt;
1320    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1321    /// let tick = process.tick();
1322    /// let numbers = process
1323    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1324    ///     .into_keyed();
1325    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1326    /// batch
1327    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
1328    ///     .entries()
1329    ///     .all_ticks()
1330    /// # }, |mut stream| async move {
1331    /// // (1, 5), (2, 7)
1332    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1333    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1334    /// # }));
1335    /// ```
1336    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1337        self,
1338        init: impl IntoQuotedMut<'a, I, L>,
1339        comb: impl IntoQuotedMut<'a, F, L>,
1340    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1341        let init = init.splice_fn0_ctx(&self.location).into();
1342        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1343
1344        KeyedSingleton::new(
1345            self.location.clone(),
1346            HydroNode::FoldKeyed {
1347                init,
1348                acc: comb,
1349                input: Box::new(self.ir_node.into_inner()),
1350                metadata: self.location.new_node_metadata(KeyedSingleton::<
1351                    K,
1352                    A,
1353                    L,
1354                    B::WhenValueUnbounded,
1355                >::collection_kind()),
1356            },
1357        )
1358    }
1359
1360    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1361    ///
1362    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1363    /// to depend on the order of elements in the stream.
1364    ///
1365    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1366    ///
1367    /// # Example
1368    /// ```rust
1369    /// # use hydro_lang::prelude::*;
1370    /// # use futures::StreamExt;
1371    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1372    /// let tick = process.tick();
1373    /// let numbers = process
1374    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1375    ///     .into_keyed();
1376    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1377    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1378    /// # }, |mut stream| async move {
1379    /// // (1, 5), (2, 7)
1380    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1381    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1382    /// # }));
1383    /// ```
1384    pub fn reduce<F: Fn(&mut V, V) + 'a>(
1385        self,
1386        comb: impl IntoQuotedMut<'a, F, L>,
1387    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1388        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1389
1390        KeyedSingleton::new(
1391            self.location.clone(),
1392            HydroNode::ReduceKeyed {
1393                f,
1394                input: Box::new(self.ir_node.into_inner()),
1395                metadata: self.location.new_node_metadata(KeyedSingleton::<
1396                    K,
1397                    V,
1398                    L,
1399                    B::WhenValueUnbounded,
1400                >::collection_kind()),
1401            },
1402        )
1403    }
1404
1405    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1406    ///
1407    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1408    /// to depend on the order of elements in the stream.
1409    ///
1410    /// # Example
1411    /// ```rust
1412    /// # use hydro_lang::prelude::*;
1413    /// # use futures::StreamExt;
1414    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1415    /// let tick = process.tick();
1416    /// let watermark = tick.singleton(q!(1));
1417    /// let numbers = process
1418    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1419    ///     .into_keyed();
1420    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1421    /// batch
1422    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1423    ///     .entries()
1424    ///     .all_ticks()
1425    /// # }, |mut stream| async move {
1426    /// // (2, 204)
1427    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1428    /// # }));
1429    /// ```
1430    pub fn reduce_watermark<O, F>(
1431        self,
1432        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1433        comb: impl IntoQuotedMut<'a, F, L>,
1434    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1435    where
1436        O: Clone,
1437        F: Fn(&mut V, V) + 'a,
1438    {
1439        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1440        check_matching_location(&self.location.root(), other.location.outer());
1441        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1442
1443        KeyedSingleton::new(
1444            self.location.clone(),
1445            HydroNode::ReduceKeyedWatermark {
1446                f,
1447                input: Box::new(self.ir_node.into_inner()),
1448                watermark: Box::new(other.ir_node.into_inner()),
1449                metadata: self.location.new_node_metadata(KeyedSingleton::<
1450                    K,
1451                    V,
1452                    L,
1453                    B::WhenValueUnbounded,
1454                >::collection_kind()),
1455            },
1456        )
1457    }
1458}
1459
1460impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1461where
1462    K: Eq + Hash,
1463    L: Location<'a>,
1464{
1465    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1466    ///
1467    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1468    ///
1469    /// If the input and output value types are the same and do not require initialization then use
1470    /// [`KeyedStream::reduce_commutative`].
1471    ///
1472    /// # Example
1473    /// ```rust
1474    /// # use hydro_lang::prelude::*;
1475    /// # use futures::StreamExt;
1476    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1477    /// let tick = process.tick();
1478    /// let numbers = process
1479    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1480    ///     .into_keyed();
1481    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1482    /// batch
1483    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1484    ///     .entries()
1485    ///     .all_ticks()
1486    /// # }, |mut stream| async move {
1487    /// // (1, 5), (2, 7)
1488    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1489    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1490    /// # }));
1491    /// ```
1492    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1493        self,
1494        init: impl IntoQuotedMut<'a, I, L>,
1495        comb: impl IntoQuotedMut<'a, F, L>,
1496    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1497        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1498            .fold(init, comb)
1499    }
1500
1501    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1502    ///
1503    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1504    ///
1505    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1506    ///
1507    /// # Example
1508    /// ```rust
1509    /// # use hydro_lang::prelude::*;
1510    /// # use futures::StreamExt;
1511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1512    /// let tick = process.tick();
1513    /// let numbers = process
1514    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1515    ///     .into_keyed();
1516    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1517    /// batch
1518    ///     .reduce_commutative(q!(|acc, x| *acc += x))
1519    ///     .entries()
1520    ///     .all_ticks()
1521    /// # }, |mut stream| async move {
1522    /// // (1, 5), (2, 7)
1523    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1524    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1525    /// # }));
1526    /// ```
1527    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1528        self,
1529        comb: impl IntoQuotedMut<'a, F, L>,
1530    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1531        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1532            .reduce(comb)
1533    }
1534
1535    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1536    ///
1537    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1538    ///
1539    /// # Example
1540    /// ```rust
1541    /// # use hydro_lang::prelude::*;
1542    /// # use futures::StreamExt;
1543    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1544    /// let tick = process.tick();
1545    /// let watermark = tick.singleton(q!(1));
1546    /// let numbers = process
1547    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1548    ///     .into_keyed();
1549    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1550    /// batch
1551    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1552    ///     .entries()
1553    ///     .all_ticks()
1554    /// # }, |mut stream| async move {
1555    /// // (2, 204)
1556    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1557    /// # }));
1558    /// ```
1559    pub fn reduce_watermark_commutative<O2, F>(
1560        self,
1561        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1562        comb: impl IntoQuotedMut<'a, F, L>,
1563    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1564    where
1565        O2: Clone,
1566        F: Fn(&mut V, V) + 'a,
1567    {
1568        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1569            .reduce_watermark(other, comb)
1570    }
1571}
1572
1573impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1574where
1575    K: Eq + Hash,
1576    L: Location<'a>,
1577{
1578    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1579    ///
1580    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1581    ///
1582    /// If the input and output value types are the same and do not require initialization then use
1583    /// [`KeyedStream::reduce_idempotent`].
1584    ///
1585    /// # Example
1586    /// ```rust
1587    /// # use hydro_lang::prelude::*;
1588    /// # use futures::StreamExt;
1589    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1590    /// let tick = process.tick();
1591    /// let numbers = process
1592    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1593    ///     .into_keyed();
1594    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1595    /// batch
1596    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1597    ///     .entries()
1598    ///     .all_ticks()
1599    /// # }, |mut stream| async move {
1600    /// // (1, false), (2, true)
1601    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1602    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1603    /// # }));
1604    /// ```
1605    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1606        self,
1607        init: impl IntoQuotedMut<'a, I, L>,
1608        comb: impl IntoQuotedMut<'a, F, L>,
1609    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1610        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1611            .fold(init, comb)
1612    }
1613
1614    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1615    ///
1616    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1617    ///
1618    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1619    ///
1620    /// # Example
1621    /// ```rust
1622    /// # use hydro_lang::prelude::*;
1623    /// # use futures::StreamExt;
1624    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1625    /// let tick = process.tick();
1626    /// let numbers = process
1627    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1628    ///     .into_keyed();
1629    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1630    /// batch
1631    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
1632    ///     .entries()
1633    ///     .all_ticks()
1634    /// # }, |mut stream| async move {
1635    /// // (1, false), (2, true)
1636    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1637    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1638    /// # }));
1639    /// ```
1640    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1641        self,
1642        comb: impl IntoQuotedMut<'a, F, L>,
1643    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1644        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1645            .reduce(comb)
1646    }
1647
1648    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1649    ///
1650    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1651    ///
1652    /// # Example
1653    /// ```rust
1654    /// # use hydro_lang::prelude::*;
1655    /// # use futures::StreamExt;
1656    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657    /// let tick = process.tick();
1658    /// let watermark = tick.singleton(q!(1));
1659    /// let numbers = process
1660    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1661    ///     .into_keyed();
1662    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1663    /// batch
1664    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1665    ///     .entries()
1666    ///     .all_ticks()
1667    /// # }, |mut stream| async move {
1668    /// // (2, true)
1669    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1670    /// # }));
1671    /// ```
1672    pub fn reduce_watermark_idempotent<O2, F>(
1673        self,
1674        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1675        comb: impl IntoQuotedMut<'a, F, L>,
1676    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1677    where
1678        O2: Clone,
1679        F: Fn(&mut V, V) + 'a,
1680    {
1681        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1682            .reduce_watermark(other, comb)
1683    }
1684}
1685
1686impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1687where
1688    K: Eq + Hash,
1689    L: Location<'a>,
1690{
1691    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1692    ///
1693    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1694    /// as there may be non-deterministic duplicates.
1695    ///
1696    /// If the input and output value types are the same and do not require initialization then use
1697    /// [`KeyedStream::reduce_commutative_idempotent`].
1698    ///
1699    /// # Example
1700    /// ```rust
1701    /// # use hydro_lang::prelude::*;
1702    /// # use futures::StreamExt;
1703    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1704    /// let tick = process.tick();
1705    /// let numbers = process
1706    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1707    ///     .into_keyed();
1708    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1709    /// batch
1710    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1711    ///     .entries()
1712    ///     .all_ticks()
1713    /// # }, |mut stream| async move {
1714    /// // (1, false), (2, true)
1715    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1716    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1717    /// # }));
1718    /// ```
1719    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1720        self,
1721        init: impl IntoQuotedMut<'a, I, L>,
1722        comb: impl IntoQuotedMut<'a, F, L>,
1723    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1724        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1725            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1726            .fold(init, comb)
1727    }
1728
1729    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1730    ///
1731    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1732    /// as there may be non-deterministic duplicates.
1733    ///
1734    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1735    ///
1736    /// # Example
1737    /// ```rust
1738    /// # use hydro_lang::prelude::*;
1739    /// # use futures::StreamExt;
1740    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1741    /// let tick = process.tick();
1742    /// let numbers = process
1743    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1744    ///     .into_keyed();
1745    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1746    /// batch
1747    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1748    ///     .entries()
1749    ///     .all_ticks()
1750    /// # }, |mut stream| async move {
1751    /// // (1, false), (2, true)
1752    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1753    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1754    /// # }));
1755    /// ```
1756    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1757        self,
1758        comb: impl IntoQuotedMut<'a, F, L>,
1759    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1760        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1761            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1762            .reduce(comb)
1763    }
1764
1765    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1766    ///
1767    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1768    /// as there may be non-deterministic duplicates.
1769    ///
1770    /// # Example
1771    /// ```rust
1772    /// # use hydro_lang::prelude::*;
1773    /// # use futures::StreamExt;
1774    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1775    /// let tick = process.tick();
1776    /// let watermark = tick.singleton(q!(1));
1777    /// let numbers = process
1778    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1779    ///     .into_keyed();
1780    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1781    /// batch
1782    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1783    ///     .entries()
1784    ///     .all_ticks()
1785    /// # }, |mut stream| async move {
1786    /// // (2, true)
1787    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1788    /// # }));
1789    /// ```
1790    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1791        self,
1792        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1793        comb: impl IntoQuotedMut<'a, F, L>,
1794    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1795    where
1796        O2: Clone,
1797        F: Fn(&mut V, V) + 'a,
1798    {
1799        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1800            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1801            .reduce_watermark(other, comb)
1802    }
1803
1804    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1805    /// whose keys are not in the bounded stream.
1806    ///
1807    /// # Example
1808    /// ```rust
1809    /// # use hydro_lang::prelude::*;
1810    /// # use futures::StreamExt;
1811    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1812    /// let tick = process.tick();
1813    /// let keyed_stream = process
1814    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1815    ///     .batch(&tick, nondet!(/** test */))
1816    ///     .into_keyed();
1817    /// let keys_to_remove = process
1818    ///     .source_iter(q!(vec![1, 2]))
1819    ///     .batch(&tick, nondet!(/** test */));
1820    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1821    /// #   .entries()
1822    /// # }, |mut stream| async move {
1823    /// // { 3: ['c'], 4: ['d'] }
1824    /// # for w in vec![(3, 'c'), (4, 'd')] {
1825    /// #     assert_eq!(stream.next().await.unwrap(), w);
1826    /// # }
1827    /// # }));
1828    /// ```
1829    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1830        self,
1831        other: Stream<K, L, Bounded, O2, R2>,
1832    ) -> Self {
1833        check_matching_location(&self.location, &other.location);
1834
1835        KeyedStream::new(
1836            self.location.clone(),
1837            HydroNode::AntiJoin {
1838                pos: Box::new(self.ir_node.into_inner()),
1839                neg: Box::new(other.ir_node.into_inner()),
1840                metadata: self.location.new_node_metadata(Self::collection_kind()),
1841            },
1842        )
1843    }
1844}
1845
1846impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1847where
1848    L: Location<'a>,
1849{
1850    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1851    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1852    ///
1853    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1854    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1855    /// argument that declares where the stream will be atomically processed. Batching a stream into
1856    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1857    /// [`Tick`] will introduce asynchrony.
1858    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1859        let out_location = Atomic { tick: tick.clone() };
1860        KeyedStream::new(
1861            out_location.clone(),
1862            HydroNode::BeginAtomic {
1863                inner: Box::new(self.ir_node.into_inner()),
1864                metadata: out_location
1865                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1866            },
1867        )
1868    }
1869
1870    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1871    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1872    /// the order of the input.
1873    ///
1874    /// # Non-Determinism
1875    /// The batch boundaries are non-deterministic and may change across executions.
1876    pub fn batch(
1877        self,
1878        tick: &Tick<L>,
1879        nondet: NonDet,
1880    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1881        let _ = nondet;
1882        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1883        KeyedStream::new(
1884            tick.clone(),
1885            HydroNode::Batch {
1886                inner: Box::new(self.ir_node.into_inner()),
1887                metadata: tick.new_node_metadata(
1888                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1889                ),
1890            },
1891        )
1892    }
1893}
1894
1895impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1896where
1897    L: Location<'a> + NoTick,
1898{
1899    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1900    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1901    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1902    /// used to create the atomic section.
1903    ///
1904    /// # Non-Determinism
1905    /// The batch boundaries are non-deterministic and may change across executions.
1906    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1907        let _ = nondet;
1908        KeyedStream::new(
1909            self.location.clone().tick,
1910            HydroNode::Batch {
1911                inner: Box::new(self.ir_node.into_inner()),
1912                metadata: self.location.tick.new_node_metadata(KeyedStream::<
1913                    K,
1914                    V,
1915                    Tick<L>,
1916                    Bounded,
1917                    O,
1918                    R,
1919                >::collection_kind(
1920                )),
1921            },
1922        )
1923    }
1924
1925    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1926    /// See [`KeyedStream::atomic`] for more details.
1927    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1928        KeyedStream::new(
1929            self.location.tick.l.clone(),
1930            HydroNode::EndAtomic {
1931                inner: Box::new(self.ir_node.into_inner()),
1932                metadata: self
1933                    .location
1934                    .tick
1935                    .l
1936                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1937            },
1938        )
1939    }
1940}
1941
1942impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1943where
1944    L: Location<'a>,
1945{
1946    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1947    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1948    /// is only present in one of the inputs, its values are passed through as-is). The output has
1949    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1950    ///
1951    /// Currently, both input streams must be [`Bounded`]. This operator will block
1952    /// on the first stream until all its elements are available. In a future version,
1953    /// we will relax the requirement on the `other` stream.
1954    ///
1955    /// # Example
1956    /// ```rust
1957    /// # use hydro_lang::prelude::*;
1958    /// # use futures::StreamExt;
1959    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1960    /// let tick = process.tick();
1961    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1962    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1963    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1964    /// # .entries()
1965    /// # }, |mut stream| async move {
1966    /// // { 0: [2, 1], 1: [4, 3] }
1967    /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1968    /// #     assert_eq!(stream.next().await.unwrap(), w);
1969    /// # }
1970    /// # }));
1971    /// ```
1972    pub fn chain<O2: Ordering, R2: Retries>(
1973        self,
1974        other: KeyedStream<K, V, L, Bounded, O2, R2>,
1975    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1976    where
1977        O: MinOrder<O2>,
1978        R: MinRetries<R2>,
1979    {
1980        check_matching_location(&self.location, &other.location);
1981
1982        KeyedStream::new(
1983            self.location.clone(),
1984            HydroNode::Chain {
1985                first: Box::new(self.ir_node.into_inner()),
1986                second: Box::new(other.ir_node.into_inner()),
1987                metadata: self.location.new_node_metadata(KeyedStream::<
1988                    K,
1989                    V,
1990                    L,
1991                    Bounded,
1992                    <O as MinOrder<O2>>::Min,
1993                    <R as MinRetries<R2>>::Min,
1994                >::collection_kind()),
1995            },
1996        )
1997    }
1998}
1999
2000impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2001where
2002    L: Location<'a>,
2003{
2004    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2005    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2006    /// each key.
2007    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2008        KeyedStream::new(
2009            self.location.outer().clone(),
2010            HydroNode::YieldConcat {
2011                inner: Box::new(self.ir_node.into_inner()),
2012                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2013                    K,
2014                    V,
2015                    L,
2016                    Unbounded,
2017                    O,
2018                    R,
2019                >::collection_kind(
2020                )),
2021            },
2022        )
2023    }
2024
2025    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2026    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2027    /// each key.
2028    ///
2029    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2030    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2031    /// stream's [`Tick`] context.
2032    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2033        let out_location = Atomic {
2034            tick: self.location.clone(),
2035        };
2036
2037        KeyedStream::new(
2038            out_location.clone(),
2039            HydroNode::YieldConcat {
2040                inner: Box::new(self.ir_node.into_inner()),
2041                metadata: out_location.new_node_metadata(KeyedStream::<
2042                    K,
2043                    V,
2044                    Atomic<L>,
2045                    Unbounded,
2046                    O,
2047                    R,
2048                >::collection_kind()),
2049            },
2050        )
2051    }
2052
2053    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2054    /// tick `T` always has the entries of `self` at tick `T - 1`.
2055    ///
2056    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2057    ///
2058    /// This operator enables stateful iterative processing with ticks, by sending data from one
2059    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2060    ///
2061    /// # Example
2062    /// ```rust
2063    /// # use hydro_lang::prelude::*;
2064    /// # use futures::StreamExt;
2065    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2066    /// let tick = process.tick();
2067    /// # // ticks are lazy by default, forces the second tick to run
2068    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2069    /// # let batch_first_tick = process
2070    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2071    /// #   .batch(&tick, nondet!(/** test */))
2072    /// #   .into_keyed();
2073    /// # let batch_second_tick = process
2074    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2075    /// #   .batch(&tick, nondet!(/** test */))
2076    /// #   .defer_tick()
2077    /// #   .into_keyed(); // appears on the second tick
2078    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2079    /// # batch_first_tick.chain(batch_second_tick);
2080    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2081    ///     changes_across_ticks // from the current tick
2082    /// )
2083    /// # .entries().all_ticks()
2084    /// # }, |mut stream| async move {
2085    /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2086    /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2087    /// #     assert_eq!(stream.next().await.unwrap(), w);
2088    /// # }
2089    /// # }));
2090    /// ```
2091    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2092        KeyedStream::new(
2093            self.location.clone(),
2094            HydroNode::DeferTick {
2095                input: Box::new(self.ir_node.into_inner()),
2096                metadata: self.location.new_node_metadata(KeyedStream::<
2097                    K,
2098                    V,
2099                    Tick<L>,
2100                    Bounded,
2101                    O,
2102                    R,
2103                >::collection_kind()),
2104            },
2105        )
2106    }
2107}
2108
2109#[cfg(test)]
2110mod tests {
2111    #[cfg(feature = "deploy")]
2112    use futures::{SinkExt, StreamExt};
2113    #[cfg(feature = "deploy")]
2114    use hydro_deploy::Deployment;
2115    use stageleft::q;
2116
2117    use crate::compile::builder::FlowBuilder;
2118    #[cfg(feature = "deploy")]
2119    use crate::live_collections::stream::ExactlyOnce;
2120    use crate::location::Location;
2121    use crate::nondet::nondet;
2122
2123    #[cfg(feature = "deploy")]
2124    #[tokio::test]
2125    async fn reduce_watermark_filter() {
2126        let mut deployment = Deployment::new();
2127
2128        let flow = FlowBuilder::new();
2129        let node = flow.process::<()>();
2130        let external = flow.external::<()>();
2131
2132        let node_tick = node.tick();
2133        let watermark = node_tick.singleton(q!(1));
2134
2135        let sum = node
2136            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2137            .into_keyed()
2138            .reduce_watermark(
2139                watermark,
2140                q!(|acc, v| {
2141                    *acc += v;
2142                }),
2143            )
2144            .snapshot(&node_tick, nondet!(/** test */))
2145            .entries()
2146            .all_ticks()
2147            .send_bincode_external(&external);
2148
2149        let nodes = flow
2150            .with_process(&node, deployment.Localhost())
2151            .with_external(&external, deployment.Localhost())
2152            .deploy(&mut deployment);
2153
2154        deployment.deploy().await.unwrap();
2155
2156        let mut out = nodes.connect(sum).await;
2157
2158        deployment.start().await.unwrap();
2159
2160        assert_eq!(out.next().await.unwrap(), (2, 204));
2161    }
2162
2163    #[cfg(feature = "deploy")]
2164    #[tokio::test]
2165    async fn reduce_watermark_garbage_collect() {
2166        let mut deployment = Deployment::new();
2167
2168        let flow = FlowBuilder::new();
2169        let node = flow.process::<()>();
2170        let external = flow.external::<()>();
2171        let (tick_send, tick_trigger) =
2172            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2173
2174        let node_tick = node.tick();
2175        let (watermark_complete_cycle, watermark) =
2176            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2177        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2178        watermark_complete_cycle.complete_next_tick(next_watermark);
2179
2180        let tick_triggered_input = node
2181            .source_iter(q!([(3, 103)]))
2182            .batch(&node_tick, nondet!(/** test */))
2183            .filter_if_some(
2184                tick_trigger
2185                    .clone()
2186                    .batch(&node_tick, nondet!(/** test */))
2187                    .first(),
2188            )
2189            .all_ticks();
2190
2191        let sum = node
2192            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2193            .interleave(tick_triggered_input)
2194            .into_keyed()
2195            .reduce_watermark_commutative(
2196                watermark,
2197                q!(|acc, v| {
2198                    *acc += v;
2199                }),
2200            )
2201            .snapshot(&node_tick, nondet!(/** test */))
2202            .entries()
2203            .all_ticks()
2204            .send_bincode_external(&external);
2205
2206        let nodes = flow
2207            .with_default_optimize()
2208            .with_process(&node, deployment.Localhost())
2209            .with_external(&external, deployment.Localhost())
2210            .deploy(&mut deployment);
2211
2212        deployment.deploy().await.unwrap();
2213
2214        let mut tick_send = nodes.connect(tick_send).await;
2215        let mut out_recv = nodes.connect(sum).await;
2216
2217        deployment.start().await.unwrap();
2218
2219        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2220
2221        tick_send.send(()).await.unwrap();
2222
2223        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2224    }
2225
2226    #[test]
2227    #[should_panic]
2228    fn sim_batch_nondet_size() {
2229        let flow = FlowBuilder::new();
2230        let external = flow.external::<()>();
2231        let node = flow.process::<()>();
2232
2233        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2234
2235        let tick = node.tick();
2236        let out_port = input
2237            .batch(&tick, nondet!(/** test */))
2238            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2239            .entries()
2240            .all_ticks()
2241            .send_bincode_external(&external);
2242
2243        flow.sim().exhaustive(async |mut compiled| {
2244            let out_recv = compiled.connect(&out_port);
2245            compiled.launch();
2246
2247            out_recv
2248                .assert_yields_only_unordered([(1, vec![1, 2])])
2249                .await;
2250        });
2251    }
2252
2253    #[test]
2254    fn sim_batch_preserves_group_order() {
2255        let flow = FlowBuilder::new();
2256        let external = flow.external::<()>();
2257        let node = flow.process::<()>();
2258
2259        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2260
2261        let tick = node.tick();
2262        let out_port = input
2263            .batch(&tick, nondet!(/** test */))
2264            .all_ticks()
2265            .fold_early_stop(
2266                q!(|| 0),
2267                q!(|acc, v| {
2268                    *acc = std::cmp::max(v, *acc);
2269                    *acc >= 2
2270                }),
2271            )
2272            .entries()
2273            .send_bincode_external(&external);
2274
2275        let instances = flow.sim().exhaustive(async |mut compiled| {
2276            let out_recv = compiled.connect(&out_port);
2277            compiled.launch();
2278
2279            out_recv
2280                .assert_yields_only_unordered([(1, 2), (2, 3)])
2281                .await;
2282        });
2283
2284        assert_eq!(instances, 8);
2285        // - three cases: all three in a separate tick (pick where (2, 3) is)
2286        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2287        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2288        // - one case: all three together
2289    }
2290
2291    #[test]
2292    fn sim_batch_unordered_shuffles() {
2293        let flow = FlowBuilder::new();
2294        let external = flow.external::<()>();
2295        let node = flow.process::<()>();
2296
2297        let input = node
2298            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2299            .into_keyed()
2300            .weakest_ordering();
2301
2302        let tick = node.tick();
2303        let out_port = input
2304            .batch(&tick, nondet!(/** test */))
2305            .all_ticks()
2306            .entries()
2307            .send_bincode_external(&external);
2308
2309        let instances = flow.sim().exhaustive(async |mut compiled| {
2310            let out_recv = compiled.connect(&out_port);
2311            compiled.launch();
2312
2313            out_recv
2314                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2315                .await;
2316        });
2317
2318        assert_eq!(instances, 13);
2319        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2320        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2321        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2322        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2323    }
2324}