hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22use crate::live_collections::stream::{Ordering, Retries};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::DeferTick;
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30pub mod networking;
31
32/// Streaming elements of type `V` grouped by a key of type `K`.
33///
34/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
35/// order of keys is non-deterministic but the order *within* each group may be deterministic.
36///
37/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
38/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
39/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
40///
41/// Type Parameters:
42/// - `K`: the type of the key for each group
43/// - `V`: the type of the elements inside each group
44/// - `Loc`: the [`Location`] where the keyed stream is materialized
45/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
46/// - `Order`: tracks whether the elements within each group have deterministic order
47/// ([`TotalOrder`]) or not ([`NoOrder`])
48/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
49/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
50pub struct KeyedStream<
51 K,
52 V,
53 Loc,
54 Bound: Boundedness,
55 Order: Ordering = TotalOrder,
56 Retry: Retries = ExactlyOnce,
57> {
58 pub(crate) location: Loc,
59 pub(crate) ir_node: RefCell<HydroNode>,
60
61 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
62}
63
64impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
65 for KeyedStream<K, V, L, B, NoOrder, R>
66where
67 L: Location<'a>,
68{
69 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
70 KeyedStream {
71 location: stream.location,
72 ir_node: stream.ir_node,
73 _phantom: PhantomData,
74 }
75 }
76}
77
78impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
79where
80 L: Location<'a>,
81{
82 fn defer_tick(self) -> Self {
83 KeyedStream::defer_tick(self)
84 }
85}
86
87impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
88 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
89where
90 L: Location<'a>,
91{
92 type Location = Tick<L>;
93
94 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95 KeyedStream {
96 location: location.clone(),
97 ir_node: RefCell::new(HydroNode::CycleSource {
98 ident,
99 metadata: location.new_node_metadata(
100 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
101 ),
102 }),
103 _phantom: PhantomData,
104 }
105 }
106}
107
108impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
109 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
110where
111 L: Location<'a>,
112{
113 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114 assert_eq!(
115 Location::id(&self.location),
116 expected_location,
117 "locations do not match"
118 );
119
120 self.location
121 .flow_state()
122 .borrow_mut()
123 .push_root(HydroRoot::CycleSink {
124 ident,
125 input: Box::new(self.ir_node.into_inner()),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
132 for KeyedStream<K, V, L, B, O, R>
133where
134 L: Location<'a> + NoTick,
135{
136 type Location = L;
137
138 fn create_source(ident: syn::Ident, location: L) -> Self {
139 KeyedStream {
140 location: location.clone(),
141 ir_node: RefCell::new(HydroNode::CycleSource {
142 ident,
143 metadata: location
144 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
152 for KeyedStream<K, V, L, B, O, R>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
174 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
175{
176 fn clone(&self) -> Self {
177 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
178 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
179 *self.ir_node.borrow_mut() = HydroNode::Tee {
180 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
181 metadata: self.location.new_node_metadata(Self::collection_kind()),
182 };
183 }
184
185 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
186 KeyedStream {
187 location: self.location.clone(),
188 ir_node: HydroNode::Tee {
189 inner: TeeNode(inner.0.clone()),
190 metadata: metadata.clone(),
191 }
192 .into(),
193 _phantom: PhantomData,
194 }
195 } else {
196 unreachable!()
197 }
198 }
199}
200
201impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
202 KeyedStream<K, V, L, B, O, R>
203{
204 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
205 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
206 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
207
208 KeyedStream {
209 location,
210 ir_node: RefCell::new(ir_node),
211 _phantom: PhantomData,
212 }
213 }
214
215 /// Returns the [`CollectionKind`] corresponding to this type.
216 pub fn collection_kind() -> CollectionKind {
217 CollectionKind::KeyedStream {
218 bound: B::BOUND_KIND,
219 value_order: O::ORDERING_KIND,
220 value_retry: R::RETRIES_KIND,
221 key_type: stageleft::quote_type::<K>().into(),
222 value_type: stageleft::quote_type::<V>().into(),
223 }
224 }
225
226 /// Returns the [`Location`] where this keyed stream is being materialized.
227 pub fn location(&self) -> &L {
228 &self.location
229 }
230
231 /// Explicitly "casts" the keyed stream to a type with a different ordering
232 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
233 /// by the type-system.
234 ///
235 /// # Non-Determinism
236 /// This function is used as an escape hatch, and any mistakes in the
237 /// provided ordering guarantee will propagate into the guarantees
238 /// for the rest of the program.
239 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
240 if O::ORDERING_KIND == O2::ORDERING_KIND {
241 KeyedStream::new(self.location, self.ir_node.into_inner())
242 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
243 // We can always weaken the ordering guarantee
244 KeyedStream::new(
245 self.location.clone(),
246 HydroNode::Cast {
247 inner: Box::new(self.ir_node.into_inner()),
248 metadata: self
249 .location
250 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
251 },
252 )
253 } else {
254 KeyedStream::new(
255 self.location.clone(),
256 HydroNode::ObserveNonDet {
257 inner: Box::new(self.ir_node.into_inner()),
258 trusted: false,
259 metadata: self
260 .location
261 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
262 },
263 )
264 }
265 }
266
267 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
268 /// which is always safe because that is the weakest possible guarantee.
269 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
270 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
271 self.assume_ordering::<NoOrder>(nondet)
272 }
273
274 /// Explicitly "casts" the keyed stream to a type with a different retries
275 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
276 /// be proven by the type-system.
277 ///
278 /// # Non-Determinism
279 /// This function is used as an escape hatch, and any mistakes in the
280 /// provided retries guarantee will propagate into the guarantees
281 /// for the rest of the program.
282 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
283 if R::RETRIES_KIND == R2::RETRIES_KIND {
284 KeyedStream::new(self.location, self.ir_node.into_inner())
285 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
286 // We can always weaken the retries guarantee
287 KeyedStream::new(
288 self.location.clone(),
289 HydroNode::Cast {
290 inner: Box::new(self.ir_node.into_inner()),
291 metadata: self
292 .location
293 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
294 },
295 )
296 } else {
297 KeyedStream::new(
298 self.location.clone(),
299 HydroNode::ObserveNonDet {
300 inner: Box::new(self.ir_node.into_inner()),
301 trusted: false,
302 metadata: self
303 .location
304 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
305 },
306 )
307 }
308 }
309
310 /// Flattens the keyed stream into an unordered stream of key-value pairs.
311 ///
312 /// # Example
313 /// ```rust
314 /// # use hydro_lang::prelude::*;
315 /// # use futures::StreamExt;
316 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
317 /// process
318 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
319 /// .into_keyed()
320 /// .entries()
321 /// # }, |mut stream| async move {
322 /// // (1, 2), (1, 3), (2, 4) in any order
323 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
324 /// # assert_eq!(stream.next().await.unwrap(), w);
325 /// # }
326 /// # }));
327 /// ```
328 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
329 Stream::new(
330 self.location.clone(),
331 HydroNode::Cast {
332 inner: Box::new(self.ir_node.into_inner()),
333 metadata: self
334 .location
335 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
336 },
337 )
338 }
339
340 /// Flattens the keyed stream into an unordered stream of only the values.
341 ///
342 /// # Example
343 /// ```rust
344 /// # use hydro_lang::prelude::*;
345 /// # use futures::StreamExt;
346 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
347 /// process
348 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
349 /// .into_keyed()
350 /// .values()
351 /// # }, |mut stream| async move {
352 /// // 2, 3, 4 in any order
353 /// # for w in vec![2, 3, 4] {
354 /// # assert_eq!(stream.next().await.unwrap(), w);
355 /// # }
356 /// # }));
357 /// ```
358 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
359 self.entries().map(q!(|(_, v)| v))
360 }
361
362 /// Transforms each value by invoking `f` on each element, with keys staying the same
363 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
364 ///
365 /// If you do not want to modify the stream and instead only want to view
366 /// each item use [`KeyedStream::inspect`] instead.
367 ///
368 /// # Example
369 /// ```rust
370 /// # use hydro_lang::prelude::*;
371 /// # use futures::StreamExt;
372 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
373 /// process
374 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
375 /// .into_keyed()
376 /// .map(q!(|v| v + 1))
377 /// # .entries()
378 /// # }, |mut stream| async move {
379 /// // { 1: [3, 4], 2: [5] }
380 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
381 /// # assert_eq!(stream.next().await.unwrap(), w);
382 /// # }
383 /// # }));
384 /// ```
385 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
386 where
387 F: Fn(V) -> U + 'a,
388 {
389 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
390 let map_f = q!({
391 let orig = f;
392 move |(k, v)| (k, orig(v))
393 })
394 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
395 .into();
396
397 KeyedStream::new(
398 self.location.clone(),
399 HydroNode::Map {
400 f: map_f,
401 input: Box::new(self.ir_node.into_inner()),
402 metadata: self
403 .location
404 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
405 },
406 )
407 }
408
409 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
410 /// re-grouped even they are tuples; instead they will be grouped under the original key.
411 ///
412 /// If you do not want to modify the stream and instead only want to view
413 /// each item use [`KeyedStream::inspect_with_key`] instead.
414 ///
415 /// # Example
416 /// ```rust
417 /// # use hydro_lang::prelude::*;
418 /// # use futures::StreamExt;
419 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
420 /// process
421 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
422 /// .into_keyed()
423 /// .map_with_key(q!(|(k, v)| k + v))
424 /// # .entries()
425 /// # }, |mut stream| async move {
426 /// // { 1: [3, 4], 2: [6] }
427 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
428 /// # assert_eq!(stream.next().await.unwrap(), w);
429 /// # }
430 /// # }));
431 /// ```
432 pub fn map_with_key<U, F>(
433 self,
434 f: impl IntoQuotedMut<'a, F, L> + Copy,
435 ) -> KeyedStream<K, U, L, B, O, R>
436 where
437 F: Fn((K, V)) -> U + 'a,
438 K: Clone,
439 {
440 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
441 let map_f = q!({
442 let orig = f;
443 move |(k, v)| {
444 let out = orig((Clone::clone(&k), v));
445 (k, out)
446 }
447 })
448 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
449 .into();
450
451 KeyedStream::new(
452 self.location.clone(),
453 HydroNode::Map {
454 f: map_f,
455 input: Box::new(self.ir_node.into_inner()),
456 metadata: self
457 .location
458 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
459 },
460 )
461 }
462
463 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
464 /// `f`, preserving the order of the elements within the group.
465 ///
466 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
467 /// not modify or take ownership of the values. If you need to modify the values while filtering
468 /// use [`KeyedStream::filter_map`] instead.
469 ///
470 /// # Example
471 /// ```rust
472 /// # use hydro_lang::prelude::*;
473 /// # use futures::StreamExt;
474 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
475 /// process
476 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
477 /// .into_keyed()
478 /// .filter(q!(|&x| x > 2))
479 /// # .entries()
480 /// # }, |mut stream| async move {
481 /// // { 1: [3], 2: [4] }
482 /// # for w in vec![(1, 3), (2, 4)] {
483 /// # assert_eq!(stream.next().await.unwrap(), w);
484 /// # }
485 /// # }));
486 /// ```
487 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
488 where
489 F: Fn(&V) -> bool + 'a,
490 {
491 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
492 let filter_f = q!({
493 let orig = f;
494 move |t: &(_, _)| orig(&t.1)
495 })
496 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
497 .into();
498
499 KeyedStream::new(
500 self.location.clone(),
501 HydroNode::Filter {
502 f: filter_f,
503 input: Box::new(self.ir_node.into_inner()),
504 metadata: self.location.new_node_metadata(Self::collection_kind()),
505 },
506 )
507 }
508
509 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
510 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
511 ///
512 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
513 /// not modify or take ownership of the values. If you need to modify the values while filtering
514 /// use [`KeyedStream::filter_map_with_key`] instead.
515 ///
516 /// # Example
517 /// ```rust
518 /// # use hydro_lang::prelude::*;
519 /// # use futures::StreamExt;
520 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
521 /// process
522 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
523 /// .into_keyed()
524 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
525 /// # .entries()
526 /// # }, |mut stream| async move {
527 /// // { 1: [3], 2: [4] }
528 /// # for w in vec![(1, 3), (2, 4)] {
529 /// # assert_eq!(stream.next().await.unwrap(), w);
530 /// # }
531 /// # }));
532 /// ```
533 pub fn filter_with_key<F>(
534 self,
535 f: impl IntoQuotedMut<'a, F, L> + Copy,
536 ) -> KeyedStream<K, V, L, B, O, R>
537 where
538 F: Fn(&(K, V)) -> bool + 'a,
539 {
540 let filter_f = f
541 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
542 .into();
543
544 KeyedStream::new(
545 self.location.clone(),
546 HydroNode::Filter {
547 f: filter_f,
548 input: Box::new(self.ir_node.into_inner()),
549 metadata: self.location.new_node_metadata(Self::collection_kind()),
550 },
551 )
552 }
553
554 /// An operator that both filters and maps each value, with keys staying the same.
555 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
556 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
557 ///
558 /// # Example
559 /// ```rust
560 /// # use hydro_lang::prelude::*;
561 /// # use futures::StreamExt;
562 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
563 /// process
564 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
565 /// .into_keyed()
566 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
567 /// # .entries()
568 /// # }, |mut stream| async move {
569 /// // { 1: [2], 2: [4] }
570 /// # for w in vec![(1, 2), (2, 4)] {
571 /// # assert_eq!(stream.next().await.unwrap(), w);
572 /// # }
573 /// # }));
574 /// ```
575 pub fn filter_map<U, F>(
576 self,
577 f: impl IntoQuotedMut<'a, F, L> + Copy,
578 ) -> KeyedStream<K, U, L, B, O, R>
579 where
580 F: Fn(V) -> Option<U> + 'a,
581 {
582 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
583 let filter_map_f = q!({
584 let orig = f;
585 move |(k, v)| orig(v).map(|o| (k, o))
586 })
587 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
588 .into();
589
590 KeyedStream::new(
591 self.location.clone(),
592 HydroNode::FilterMap {
593 f: filter_map_f,
594 input: Box::new(self.ir_node.into_inner()),
595 metadata: self
596 .location
597 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
598 },
599 )
600 }
601
602 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
603 /// re-grouped even they are tuples; instead they will be grouped under the original key.
604 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
605 ///
606 /// # Example
607 /// ```rust
608 /// # use hydro_lang::prelude::*;
609 /// # use futures::StreamExt;
610 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611 /// process
612 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
613 /// .into_keyed()
614 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
615 /// # .entries()
616 /// # }, |mut stream| async move {
617 /// // { 2: [2] }
618 /// # for w in vec![(2, 2)] {
619 /// # assert_eq!(stream.next().await.unwrap(), w);
620 /// # }
621 /// # }));
622 /// ```
623 pub fn filter_map_with_key<U, F>(
624 self,
625 f: impl IntoQuotedMut<'a, F, L> + Copy,
626 ) -> KeyedStream<K, U, L, B, O, R>
627 where
628 F: Fn((K, V)) -> Option<U> + 'a,
629 K: Clone,
630 {
631 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
632 let filter_map_f = q!({
633 let orig = f;
634 move |(k, v)| {
635 let out = orig((Clone::clone(&k), v));
636 out.map(|o| (k, o))
637 }
638 })
639 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
640 .into();
641
642 KeyedStream::new(
643 self.location.clone(),
644 HydroNode::FilterMap {
645 f: filter_map_f,
646 input: Box::new(self.ir_node.into_inner()),
647 metadata: self
648 .location
649 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
650 },
651 )
652 }
653
654 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
655 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
656 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
657 ///
658 /// # Example
659 /// ```rust
660 /// # use hydro_lang::prelude::*;
661 /// # use futures::StreamExt;
662 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663 /// let tick = process.tick();
664 /// let batch = process
665 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
666 /// .into_keyed()
667 /// .batch(&tick, nondet!(/** test */));
668 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
669 /// batch.cross_singleton(count).all_ticks().entries()
670 /// # }, |mut stream| async move {
671 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
672 /// # for w in vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))] {
673 /// # assert_eq!(stream.next().await.unwrap(), w);
674 /// # }
675 /// # }));
676 /// ```
677 pub fn cross_singleton<O2>(
678 self,
679 other: impl Into<Optional<O2, L, Bounded>>,
680 ) -> KeyedStream<K, (V, O2), L, B, O, R>
681 where
682 O2: Clone,
683 {
684 let other: Optional<O2, L, Bounded> = other.into();
685 check_matching_location(&self.location, &other.location);
686
687 Stream::new(
688 self.location.clone(),
689 HydroNode::CrossSingleton {
690 left: Box::new(self.ir_node.into_inner()),
691 right: Box::new(other.ir_node.into_inner()),
692 metadata: self
693 .location
694 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
695 },
696 )
697 .map(q!(|((k, v), o2)| (k, (v, o2))))
698 .into_keyed()
699 }
700
701 /// For each value `v` in each group, transform `v` using `f` and then treat the
702 /// result as an [`Iterator`] to produce values one by one within the same group.
703 /// The implementation for [`Iterator`] for the output type `I` must produce items
704 /// in a **deterministic** order.
705 ///
706 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
707 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
708 ///
709 /// # Example
710 /// ```rust
711 /// # use hydro_lang::prelude::*;
712 /// # use futures::StreamExt;
713 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714 /// process
715 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
716 /// .into_keyed()
717 /// .flat_map_ordered(q!(|x| x))
718 /// # .entries()
719 /// # }, |mut stream| async move {
720 /// // { 1: [2, 3, 4], 2: [5, 6] }
721 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
722 /// # assert_eq!(stream.next().await.unwrap(), w);
723 /// # }
724 /// # }));
725 /// ```
726 pub fn flat_map_ordered<U, I, F>(
727 self,
728 f: impl IntoQuotedMut<'a, F, L> + Copy,
729 ) -> KeyedStream<K, U, L, B, O, R>
730 where
731 I: IntoIterator<Item = U>,
732 F: Fn(V) -> I + 'a,
733 K: Clone,
734 {
735 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
736 let flat_map_f = q!({
737 let orig = f;
738 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
739 })
740 .splice_fn1_ctx::<(K, V), _>(&self.location)
741 .into();
742
743 KeyedStream::new(
744 self.location.clone(),
745 HydroNode::FlatMap {
746 f: flat_map_f,
747 input: Box::new(self.ir_node.into_inner()),
748 metadata: self
749 .location
750 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
751 },
752 )
753 }
754
755 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
756 /// for the output type `I` to produce items in any order.
757 ///
758 /// # Example
759 /// ```rust
760 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
761 /// # use futures::StreamExt;
762 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
763 /// process
764 /// .source_iter(q!(vec![
765 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
766 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
767 /// ]))
768 /// .into_keyed()
769 /// .flat_map_unordered(q!(|x| x))
770 /// # .entries()
771 /// # }, |mut stream| async move {
772 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
773 /// # let mut results = Vec::new();
774 /// # for _ in 0..4 {
775 /// # results.push(stream.next().await.unwrap());
776 /// # }
777 /// # results.sort();
778 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
779 /// # }));
780 /// ```
781 pub fn flat_map_unordered<U, I, F>(
782 self,
783 f: impl IntoQuotedMut<'a, F, L> + Copy,
784 ) -> KeyedStream<K, U, L, B, NoOrder, R>
785 where
786 I: IntoIterator<Item = U>,
787 F: Fn(V) -> I + 'a,
788 K: Clone,
789 {
790 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
791 let flat_map_f = q!({
792 let orig = f;
793 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
794 })
795 .splice_fn1_ctx::<(K, V), _>(&self.location)
796 .into();
797
798 KeyedStream::new(
799 self.location.clone(),
800 HydroNode::FlatMap {
801 f: flat_map_f,
802 input: Box::new(self.ir_node.into_inner()),
803 metadata: self
804 .location
805 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
806 },
807 )
808 }
809
810 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
811 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
812 /// items in a **deterministic** order.
813 ///
814 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
815 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
816 ///
817 /// # Example
818 /// ```rust
819 /// # use hydro_lang::prelude::*;
820 /// # use futures::StreamExt;
821 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
822 /// process
823 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
824 /// .into_keyed()
825 /// .flatten_ordered()
826 /// # .entries()
827 /// # }, |mut stream| async move {
828 /// // { 1: [2, 3, 4], 2: [5, 6] }
829 /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
830 /// # assert_eq!(stream.next().await.unwrap(), w);
831 /// # }
832 /// # }));
833 /// ```
834 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
835 where
836 V: IntoIterator<Item = U>,
837 K: Clone,
838 {
839 self.flat_map_ordered(q!(|d| d))
840 }
841
842 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
843 /// for the value type `V` to produce items in any order.
844 ///
845 /// # Example
846 /// ```rust
847 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
848 /// # use futures::StreamExt;
849 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
850 /// process
851 /// .source_iter(q!(vec![
852 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
853 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
854 /// ]))
855 /// .into_keyed()
856 /// .flatten_unordered()
857 /// # .entries()
858 /// # }, |mut stream| async move {
859 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
860 /// # let mut results = Vec::new();
861 /// # for _ in 0..4 {
862 /// # results.push(stream.next().await.unwrap());
863 /// # }
864 /// # results.sort();
865 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
866 /// # }));
867 /// ```
868 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
869 where
870 V: IntoIterator<Item = U>,
871 K: Clone,
872 {
873 self.flat_map_unordered(q!(|d| d))
874 }
875
876 /// An operator which allows you to "inspect" each element of a stream without
877 /// modifying it. The closure `f` is called on a reference to each value. This is
878 /// mainly useful for debugging, and should not be used to generate side-effects.
879 ///
880 /// # Example
881 /// ```rust
882 /// # use hydro_lang::prelude::*;
883 /// # use futures::StreamExt;
884 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
885 /// process
886 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
887 /// .into_keyed()
888 /// .inspect(q!(|v| println!("{}", v)))
889 /// # .entries()
890 /// # }, |mut stream| async move {
891 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
892 /// # assert_eq!(stream.next().await.unwrap(), w);
893 /// # }
894 /// # }));
895 /// ```
896 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
897 where
898 F: Fn(&V) + 'a,
899 {
900 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
901 let inspect_f = q!({
902 let orig = f;
903 move |t: &(_, _)| orig(&t.1)
904 })
905 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
906 .into();
907
908 KeyedStream::new(
909 self.location.clone(),
910 HydroNode::Inspect {
911 f: inspect_f,
912 input: Box::new(self.ir_node.into_inner()),
913 metadata: self.location.new_node_metadata(Self::collection_kind()),
914 },
915 )
916 }
917
918 /// An operator which allows you to "inspect" each element of a stream without
919 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
920 /// mainly useful for debugging, and should not be used to generate side-effects.
921 ///
922 /// # Example
923 /// ```rust
924 /// # use hydro_lang::prelude::*;
925 /// # use futures::StreamExt;
926 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
927 /// process
928 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
929 /// .into_keyed()
930 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
931 /// # .entries()
932 /// # }, |mut stream| async move {
933 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
934 /// # assert_eq!(stream.next().await.unwrap(), w);
935 /// # }
936 /// # }));
937 /// ```
938 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
939 where
940 F: Fn(&(K, V)) + 'a,
941 {
942 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
943
944 KeyedStream::new(
945 self.location.clone(),
946 HydroNode::Inspect {
947 f: inspect_f,
948 input: Box::new(self.ir_node.into_inner()),
949 metadata: self.location.new_node_metadata(Self::collection_kind()),
950 },
951 )
952 }
953
954 /// An operator which allows you to "name" a `HydroNode`.
955 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
956 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
957 {
958 let mut node = self.ir_node.borrow_mut();
959 let metadata = node.metadata_mut();
960 metadata.tag = Some(name.to_string());
961 }
962 self
963 }
964}
965
966impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
967 KeyedStream<K, V, L, Unbounded, O, R>
968{
969 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
970 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
971 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
972 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
973 ///
974 /// Currently, both input streams must be [`Unbounded`].
975 ///
976 /// # Example
977 /// ```rust
978 /// # use hydro_lang::prelude::*;
979 /// # use futures::StreamExt;
980 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
981 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
982 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
983 /// numbers1.interleave(numbers2)
984 /// # .entries()
985 /// # }, |mut stream| async move {
986 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
987 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
988 /// # assert_eq!(stream.next().await.unwrap(), w);
989 /// # }
990 /// # }));
991 /// ```
992 pub fn interleave<O2: Ordering, R2: Retries>(
993 self,
994 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
995 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
996 where
997 R: MinRetries<R2>,
998 {
999 let tick = self.location.tick();
1000 // Because the outputs are unordered, we can interleave batches from both streams.
1001 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1002 self.batch(&tick, nondet_batch_interleaving)
1003 .weakest_ordering()
1004 .chain(
1005 other
1006 .batch(&tick, nondet_batch_interleaving)
1007 .weakest_ordering(),
1008 )
1009 .all_ticks()
1010 }
1011}
1012
1013/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1014/// control the processing of future elements.
1015pub enum Generate<T> {
1016 /// Emit the provided element, and keep processing future inputs.
1017 Yield(T),
1018 /// Emit the provided element as the _final_ element, do not process future inputs.
1019 Return(T),
1020 /// Do not emit anything, but continue processing future inputs.
1021 Continue,
1022 /// Do not emit anything, and do not process further inputs.
1023 Break,
1024}
1025
1026impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1027where
1028 K: Eq + Hash,
1029 L: Location<'a>,
1030{
1031 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1032 ///
1033 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1034 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1035 /// early by returning `None`.
1036 ///
1037 /// The function takes a mutable reference to the accumulator and the current element, and returns
1038 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1039 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1040 ///
1041 /// # Example
1042 /// ```rust
1043 /// # use hydro_lang::prelude::*;
1044 /// # use futures::StreamExt;
1045 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046 /// process
1047 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1048 /// .into_keyed()
1049 /// .scan(
1050 /// q!(|| 0),
1051 /// q!(|acc, x| {
1052 /// *acc += x;
1053 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1054 /// }),
1055 /// )
1056 /// # .entries()
1057 /// # }, |mut stream| async move {
1058 /// // Output: { 0: [1], 1: [3, 7] }
1059 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
1060 /// # assert_eq!(stream.next().await.unwrap(), w);
1061 /// # }
1062 /// # }));
1063 /// ```
1064 pub fn scan<A, U, I, F>(
1065 self,
1066 init: impl IntoQuotedMut<'a, I, L> + Copy,
1067 f: impl IntoQuotedMut<'a, F, L> + Copy,
1068 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1069 where
1070 K: Clone,
1071 I: Fn() -> A + 'a,
1072 F: Fn(&mut A, V) -> Option<U> + 'a,
1073 {
1074 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1075 self.generator(
1076 init,
1077 q!({
1078 let orig = f;
1079 move |state, v| {
1080 if let Some(out) = orig(state, v) {
1081 Generate::Yield(out)
1082 } else {
1083 Generate::Break
1084 }
1085 }
1086 }),
1087 )
1088 }
1089
1090 /// Iteratively processes the elements in each group using a state machine that can yield
1091 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1092 /// syntax in Rust, without requiring special syntax.
1093 ///
1094 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1095 /// state for each group. The second argument defines the processing logic, taking in a
1096 /// mutable reference to the group's state and the value to be processed. It emits a
1097 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1098 /// should be processed.
1099 ///
1100 /// # Example
1101 /// ```rust
1102 /// # use hydro_lang::prelude::*;
1103 /// # use futures::StreamExt;
1104 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1105 /// process
1106 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1107 /// .into_keyed()
1108 /// .generator(
1109 /// q!(|| 0),
1110 /// q!(|acc, x| {
1111 /// *acc += x;
1112 /// if *acc > 100 {
1113 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1114 /// "done!".to_string()
1115 /// )
1116 /// } else if *acc % 2 == 0 {
1117 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1118 /// "even".to_string()
1119 /// )
1120 /// } else {
1121 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1122 /// }
1123 /// }),
1124 /// )
1125 /// # .entries()
1126 /// # }, |mut stream| async move {
1127 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1128 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
1129 /// # assert_eq!(stream.next().await.unwrap(), w);
1130 /// # }
1131 /// # }));
1132 /// ```
1133 pub fn generator<A, U, I, F>(
1134 self,
1135 init: impl IntoQuotedMut<'a, I, L> + Copy,
1136 f: impl IntoQuotedMut<'a, F, L> + Copy,
1137 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1138 where
1139 K: Clone,
1140 I: Fn() -> A + 'a,
1141 F: Fn(&mut A, V) -> Generate<U> + 'a,
1142 {
1143 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1144 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1145
1146 let scan_init = q!(|| HashMap::new())
1147 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1148 .into();
1149 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1150 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1151 if let Some(existing_state_value) = existing_state {
1152 match f(existing_state_value, v) {
1153 Generate::Yield(out) => Some(Some((k, out))),
1154 Generate::Return(out) => {
1155 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1156 Some(Some((k, out)))
1157 }
1158 Generate::Break => {
1159 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1160 Some(None)
1161 }
1162 Generate::Continue => Some(None),
1163 }
1164 } else {
1165 Some(None)
1166 }
1167 })
1168 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1169 .into();
1170
1171 let scan_node = HydroNode::Scan {
1172 init: scan_init,
1173 acc: scan_f,
1174 input: Box::new(self.ir_node.into_inner()),
1175 metadata: self.location.new_node_metadata(Stream::<
1176 Option<(K, U)>,
1177 L,
1178 B,
1179 TotalOrder,
1180 ExactlyOnce,
1181 >::collection_kind()),
1182 };
1183
1184 let flatten_f = q!(|d| d)
1185 .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1186 .into();
1187 let flatten_node = HydroNode::FlatMap {
1188 f: flatten_f,
1189 input: Box::new(scan_node),
1190 metadata: self.location.new_node_metadata(KeyedStream::<
1191 K,
1192 U,
1193 L,
1194 B,
1195 TotalOrder,
1196 ExactlyOnce,
1197 >::collection_kind()),
1198 };
1199
1200 KeyedStream::new(self.location.clone(), flatten_node)
1201 }
1202
1203 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1204 /// in-order across the values in each group. But the aggregation function returns a boolean,
1205 /// which when true indicates that the aggregated result is complete and can be released to
1206 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
1207 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1208 /// normal stream elements.
1209 ///
1210 /// # Example
1211 /// ```rust
1212 /// # use hydro_lang::prelude::*;
1213 /// # use futures::StreamExt;
1214 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1215 /// process
1216 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1217 /// .into_keyed()
1218 /// .fold_early_stop(
1219 /// q!(|| 0),
1220 /// q!(|acc, x| {
1221 /// *acc += x;
1222 /// x % 2 == 0
1223 /// }),
1224 /// )
1225 /// # .entries()
1226 /// # }, |mut stream| async move {
1227 /// // Output: { 0: 2, 1: 9 }
1228 /// # for w in vec![(0, 2), (1, 9)] {
1229 /// # assert_eq!(stream.next().await.unwrap(), w);
1230 /// # }
1231 /// # }));
1232 /// ```
1233 pub fn fold_early_stop<A, I, F>(
1234 self,
1235 init: impl IntoQuotedMut<'a, I, L> + Copy,
1236 f: impl IntoQuotedMut<'a, F, L> + Copy,
1237 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1238 where
1239 K: Clone,
1240 I: Fn() -> A + 'a,
1241 F: Fn(&mut A, V) -> bool + 'a,
1242 {
1243 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1244 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1245 let out_without_bound_cast = self.generator(
1246 q!(move || Some(init())),
1247 q!(move |key_state, v| {
1248 if let Some(key_state_value) = key_state.as_mut() {
1249 if f(key_state_value, v) {
1250 Generate::Return(key_state.take().unwrap())
1251 } else {
1252 Generate::Continue
1253 }
1254 } else {
1255 unreachable!()
1256 }
1257 }),
1258 );
1259
1260 KeyedSingleton::new(
1261 out_without_bound_cast.location.clone(),
1262 HydroNode::Cast {
1263 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1264 metadata: out_without_bound_cast
1265 .location
1266 .new_node_metadata(
1267 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1268 ),
1269 },
1270 )
1271 }
1272
1273 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1274 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1275 /// otherwise the first element would be non-deterministic.
1276 ///
1277 /// # Example
1278 /// ```rust
1279 /// # use hydro_lang::prelude::*;
1280 /// # use futures::StreamExt;
1281 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1282 /// process
1283 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1284 /// .into_keyed()
1285 /// .first()
1286 /// # .entries()
1287 /// # }, |mut stream| async move {
1288 /// // Output: { 0: 2, 1: 3 }
1289 /// # for w in vec![(0, 2), (1, 3)] {
1290 /// # assert_eq!(stream.next().await.unwrap(), w);
1291 /// # }
1292 /// # }));
1293 /// ```
1294 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1295 where
1296 K: Clone,
1297 {
1298 self.fold_early_stop(
1299 q!(|| None),
1300 q!(|acc, v| {
1301 *acc = Some(v);
1302 true
1303 }),
1304 )
1305 .map(q!(|v| v.unwrap()))
1306 }
1307
1308 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
1309 ///
1310 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1311 /// to depend on the order of elements in the group.
1312 ///
1313 /// If the input and output value types are the same and do not require initialization then use
1314 /// [`KeyedStream::reduce`].
1315 ///
1316 /// # Example
1317 /// ```rust
1318 /// # use hydro_lang::prelude::*;
1319 /// # use futures::StreamExt;
1320 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1321 /// let tick = process.tick();
1322 /// let numbers = process
1323 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1324 /// .into_keyed();
1325 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1326 /// batch
1327 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
1328 /// .entries()
1329 /// .all_ticks()
1330 /// # }, |mut stream| async move {
1331 /// // (1, 5), (2, 7)
1332 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1333 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1334 /// # }));
1335 /// ```
1336 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1337 self,
1338 init: impl IntoQuotedMut<'a, I, L>,
1339 comb: impl IntoQuotedMut<'a, F, L>,
1340 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1341 let init = init.splice_fn0_ctx(&self.location).into();
1342 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1343
1344 KeyedSingleton::new(
1345 self.location.clone(),
1346 HydroNode::FoldKeyed {
1347 init,
1348 acc: comb,
1349 input: Box::new(self.ir_node.into_inner()),
1350 metadata: self.location.new_node_metadata(KeyedSingleton::<
1351 K,
1352 A,
1353 L,
1354 B::WhenValueUnbounded,
1355 >::collection_kind()),
1356 },
1357 )
1358 }
1359
1360 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1361 ///
1362 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1363 /// to depend on the order of elements in the stream.
1364 ///
1365 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1366 ///
1367 /// # Example
1368 /// ```rust
1369 /// # use hydro_lang::prelude::*;
1370 /// # use futures::StreamExt;
1371 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1372 /// let tick = process.tick();
1373 /// let numbers = process
1374 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1375 /// .into_keyed();
1376 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1377 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1378 /// # }, |mut stream| async move {
1379 /// // (1, 5), (2, 7)
1380 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1381 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1382 /// # }));
1383 /// ```
1384 pub fn reduce<F: Fn(&mut V, V) + 'a>(
1385 self,
1386 comb: impl IntoQuotedMut<'a, F, L>,
1387 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1388 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1389
1390 KeyedSingleton::new(
1391 self.location.clone(),
1392 HydroNode::ReduceKeyed {
1393 f,
1394 input: Box::new(self.ir_node.into_inner()),
1395 metadata: self.location.new_node_metadata(KeyedSingleton::<
1396 K,
1397 V,
1398 L,
1399 B::WhenValueUnbounded,
1400 >::collection_kind()),
1401 },
1402 )
1403 }
1404
1405 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1406 ///
1407 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1408 /// to depend on the order of elements in the stream.
1409 ///
1410 /// # Example
1411 /// ```rust
1412 /// # use hydro_lang::prelude::*;
1413 /// # use futures::StreamExt;
1414 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1415 /// let tick = process.tick();
1416 /// let watermark = tick.singleton(q!(1));
1417 /// let numbers = process
1418 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1419 /// .into_keyed();
1420 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1421 /// batch
1422 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1423 /// .entries()
1424 /// .all_ticks()
1425 /// # }, |mut stream| async move {
1426 /// // (2, 204)
1427 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1428 /// # }));
1429 /// ```
1430 pub fn reduce_watermark<O, F>(
1431 self,
1432 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1433 comb: impl IntoQuotedMut<'a, F, L>,
1434 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1435 where
1436 O: Clone,
1437 F: Fn(&mut V, V) + 'a,
1438 {
1439 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1440 check_matching_location(&self.location.root(), other.location.outer());
1441 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1442
1443 KeyedSingleton::new(
1444 self.location.clone(),
1445 HydroNode::ReduceKeyedWatermark {
1446 f,
1447 input: Box::new(self.ir_node.into_inner()),
1448 watermark: Box::new(other.ir_node.into_inner()),
1449 metadata: self.location.new_node_metadata(KeyedSingleton::<
1450 K,
1451 V,
1452 L,
1453 B::WhenValueUnbounded,
1454 >::collection_kind()),
1455 },
1456 )
1457 }
1458}
1459
1460impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1461where
1462 K: Eq + Hash,
1463 L: Location<'a>,
1464{
1465 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1466 ///
1467 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1468 ///
1469 /// If the input and output value types are the same and do not require initialization then use
1470 /// [`KeyedStream::reduce_commutative`].
1471 ///
1472 /// # Example
1473 /// ```rust
1474 /// # use hydro_lang::prelude::*;
1475 /// # use futures::StreamExt;
1476 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1477 /// let tick = process.tick();
1478 /// let numbers = process
1479 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1480 /// .into_keyed();
1481 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1482 /// batch
1483 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1484 /// .entries()
1485 /// .all_ticks()
1486 /// # }, |mut stream| async move {
1487 /// // (1, 5), (2, 7)
1488 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1489 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1490 /// # }));
1491 /// ```
1492 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1493 self,
1494 init: impl IntoQuotedMut<'a, I, L>,
1495 comb: impl IntoQuotedMut<'a, F, L>,
1496 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1497 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1498 .fold(init, comb)
1499 }
1500
1501 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1502 ///
1503 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1504 ///
1505 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1506 ///
1507 /// # Example
1508 /// ```rust
1509 /// # use hydro_lang::prelude::*;
1510 /// # use futures::StreamExt;
1511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1512 /// let tick = process.tick();
1513 /// let numbers = process
1514 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1515 /// .into_keyed();
1516 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1517 /// batch
1518 /// .reduce_commutative(q!(|acc, x| *acc += x))
1519 /// .entries()
1520 /// .all_ticks()
1521 /// # }, |mut stream| async move {
1522 /// // (1, 5), (2, 7)
1523 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1524 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1525 /// # }));
1526 /// ```
1527 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1528 self,
1529 comb: impl IntoQuotedMut<'a, F, L>,
1530 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1531 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1532 .reduce(comb)
1533 }
1534
1535 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1536 ///
1537 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1538 ///
1539 /// # Example
1540 /// ```rust
1541 /// # use hydro_lang::prelude::*;
1542 /// # use futures::StreamExt;
1543 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1544 /// let tick = process.tick();
1545 /// let watermark = tick.singleton(q!(1));
1546 /// let numbers = process
1547 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1548 /// .into_keyed();
1549 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1550 /// batch
1551 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1552 /// .entries()
1553 /// .all_ticks()
1554 /// # }, |mut stream| async move {
1555 /// // (2, 204)
1556 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1557 /// # }));
1558 /// ```
1559 pub fn reduce_watermark_commutative<O2, F>(
1560 self,
1561 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1562 comb: impl IntoQuotedMut<'a, F, L>,
1563 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1564 where
1565 O2: Clone,
1566 F: Fn(&mut V, V) + 'a,
1567 {
1568 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1569 .reduce_watermark(other, comb)
1570 }
1571}
1572
1573impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1574where
1575 K: Eq + Hash,
1576 L: Location<'a>,
1577{
1578 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1579 ///
1580 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1581 ///
1582 /// If the input and output value types are the same and do not require initialization then use
1583 /// [`KeyedStream::reduce_idempotent`].
1584 ///
1585 /// # Example
1586 /// ```rust
1587 /// # use hydro_lang::prelude::*;
1588 /// # use futures::StreamExt;
1589 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1590 /// let tick = process.tick();
1591 /// let numbers = process
1592 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1593 /// .into_keyed();
1594 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1595 /// batch
1596 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1597 /// .entries()
1598 /// .all_ticks()
1599 /// # }, |mut stream| async move {
1600 /// // (1, false), (2, true)
1601 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1602 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1603 /// # }));
1604 /// ```
1605 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1606 self,
1607 init: impl IntoQuotedMut<'a, I, L>,
1608 comb: impl IntoQuotedMut<'a, F, L>,
1609 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1610 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1611 .fold(init, comb)
1612 }
1613
1614 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1615 ///
1616 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1617 ///
1618 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1619 ///
1620 /// # Example
1621 /// ```rust
1622 /// # use hydro_lang::prelude::*;
1623 /// # use futures::StreamExt;
1624 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1625 /// let tick = process.tick();
1626 /// let numbers = process
1627 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1628 /// .into_keyed();
1629 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1630 /// batch
1631 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1632 /// .entries()
1633 /// .all_ticks()
1634 /// # }, |mut stream| async move {
1635 /// // (1, false), (2, true)
1636 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1637 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1638 /// # }));
1639 /// ```
1640 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1641 self,
1642 comb: impl IntoQuotedMut<'a, F, L>,
1643 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1644 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1645 .reduce(comb)
1646 }
1647
1648 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1649 ///
1650 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1651 ///
1652 /// # Example
1653 /// ```rust
1654 /// # use hydro_lang::prelude::*;
1655 /// # use futures::StreamExt;
1656 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657 /// let tick = process.tick();
1658 /// let watermark = tick.singleton(q!(1));
1659 /// let numbers = process
1660 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1661 /// .into_keyed();
1662 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1663 /// batch
1664 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1665 /// .entries()
1666 /// .all_ticks()
1667 /// # }, |mut stream| async move {
1668 /// // (2, true)
1669 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1670 /// # }));
1671 /// ```
1672 pub fn reduce_watermark_idempotent<O2, F>(
1673 self,
1674 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1675 comb: impl IntoQuotedMut<'a, F, L>,
1676 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1677 where
1678 O2: Clone,
1679 F: Fn(&mut V, V) + 'a,
1680 {
1681 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1682 .reduce_watermark(other, comb)
1683 }
1684}
1685
1686impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1687where
1688 K: Eq + Hash,
1689 L: Location<'a>,
1690{
1691 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1692 ///
1693 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1694 /// as there may be non-deterministic duplicates.
1695 ///
1696 /// If the input and output value types are the same and do not require initialization then use
1697 /// [`KeyedStream::reduce_commutative_idempotent`].
1698 ///
1699 /// # Example
1700 /// ```rust
1701 /// # use hydro_lang::prelude::*;
1702 /// # use futures::StreamExt;
1703 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1704 /// let tick = process.tick();
1705 /// let numbers = process
1706 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1707 /// .into_keyed();
1708 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1709 /// batch
1710 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1711 /// .entries()
1712 /// .all_ticks()
1713 /// # }, |mut stream| async move {
1714 /// // (1, false), (2, true)
1715 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1716 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1717 /// # }));
1718 /// ```
1719 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1720 self,
1721 init: impl IntoQuotedMut<'a, I, L>,
1722 comb: impl IntoQuotedMut<'a, F, L>,
1723 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1724 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1725 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1726 .fold(init, comb)
1727 }
1728
1729 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1730 ///
1731 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1732 /// as there may be non-deterministic duplicates.
1733 ///
1734 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1735 ///
1736 /// # Example
1737 /// ```rust
1738 /// # use hydro_lang::prelude::*;
1739 /// # use futures::StreamExt;
1740 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1741 /// let tick = process.tick();
1742 /// let numbers = process
1743 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1744 /// .into_keyed();
1745 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1746 /// batch
1747 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1748 /// .entries()
1749 /// .all_ticks()
1750 /// # }, |mut stream| async move {
1751 /// // (1, false), (2, true)
1752 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1753 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1754 /// # }));
1755 /// ```
1756 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1757 self,
1758 comb: impl IntoQuotedMut<'a, F, L>,
1759 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1760 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1761 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1762 .reduce(comb)
1763 }
1764
1765 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1766 ///
1767 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1768 /// as there may be non-deterministic duplicates.
1769 ///
1770 /// # Example
1771 /// ```rust
1772 /// # use hydro_lang::prelude::*;
1773 /// # use futures::StreamExt;
1774 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1775 /// let tick = process.tick();
1776 /// let watermark = tick.singleton(q!(1));
1777 /// let numbers = process
1778 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1779 /// .into_keyed();
1780 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1781 /// batch
1782 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1783 /// .entries()
1784 /// .all_ticks()
1785 /// # }, |mut stream| async move {
1786 /// // (2, true)
1787 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1788 /// # }));
1789 /// ```
1790 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1791 self,
1792 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1793 comb: impl IntoQuotedMut<'a, F, L>,
1794 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1795 where
1796 O2: Clone,
1797 F: Fn(&mut V, V) + 'a,
1798 {
1799 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1800 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1801 .reduce_watermark(other, comb)
1802 }
1803
1804 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1805 /// whose keys are not in the bounded stream.
1806 ///
1807 /// # Example
1808 /// ```rust
1809 /// # use hydro_lang::prelude::*;
1810 /// # use futures::StreamExt;
1811 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1812 /// let tick = process.tick();
1813 /// let keyed_stream = process
1814 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1815 /// .batch(&tick, nondet!(/** test */))
1816 /// .into_keyed();
1817 /// let keys_to_remove = process
1818 /// .source_iter(q!(vec![1, 2]))
1819 /// .batch(&tick, nondet!(/** test */));
1820 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1821 /// # .entries()
1822 /// # }, |mut stream| async move {
1823 /// // { 3: ['c'], 4: ['d'] }
1824 /// # for w in vec![(3, 'c'), (4, 'd')] {
1825 /// # assert_eq!(stream.next().await.unwrap(), w);
1826 /// # }
1827 /// # }));
1828 /// ```
1829 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1830 self,
1831 other: Stream<K, L, Bounded, O2, R2>,
1832 ) -> Self {
1833 check_matching_location(&self.location, &other.location);
1834
1835 KeyedStream::new(
1836 self.location.clone(),
1837 HydroNode::AntiJoin {
1838 pos: Box::new(self.ir_node.into_inner()),
1839 neg: Box::new(other.ir_node.into_inner()),
1840 metadata: self.location.new_node_metadata(Self::collection_kind()),
1841 },
1842 )
1843 }
1844}
1845
1846impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1847where
1848 L: Location<'a>,
1849{
1850 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1851 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1852 ///
1853 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1854 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1855 /// argument that declares where the stream will be atomically processed. Batching a stream into
1856 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1857 /// [`Tick`] will introduce asynchrony.
1858 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1859 let out_location = Atomic { tick: tick.clone() };
1860 KeyedStream::new(
1861 out_location.clone(),
1862 HydroNode::BeginAtomic {
1863 inner: Box::new(self.ir_node.into_inner()),
1864 metadata: out_location
1865 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1866 },
1867 )
1868 }
1869
1870 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1871 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1872 /// the order of the input.
1873 ///
1874 /// # Non-Determinism
1875 /// The batch boundaries are non-deterministic and may change across executions.
1876 pub fn batch(
1877 self,
1878 tick: &Tick<L>,
1879 nondet: NonDet,
1880 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1881 let _ = nondet;
1882 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1883 KeyedStream::new(
1884 tick.clone(),
1885 HydroNode::Batch {
1886 inner: Box::new(self.ir_node.into_inner()),
1887 metadata: tick.new_node_metadata(
1888 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1889 ),
1890 },
1891 )
1892 }
1893}
1894
1895impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1896where
1897 L: Location<'a> + NoTick,
1898{
1899 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1900 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1901 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1902 /// used to create the atomic section.
1903 ///
1904 /// # Non-Determinism
1905 /// The batch boundaries are non-deterministic and may change across executions.
1906 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1907 let _ = nondet;
1908 KeyedStream::new(
1909 self.location.clone().tick,
1910 HydroNode::Batch {
1911 inner: Box::new(self.ir_node.into_inner()),
1912 metadata: self.location.tick.new_node_metadata(KeyedStream::<
1913 K,
1914 V,
1915 Tick<L>,
1916 Bounded,
1917 O,
1918 R,
1919 >::collection_kind(
1920 )),
1921 },
1922 )
1923 }
1924
1925 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1926 /// See [`KeyedStream::atomic`] for more details.
1927 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1928 KeyedStream::new(
1929 self.location.tick.l.clone(),
1930 HydroNode::EndAtomic {
1931 inner: Box::new(self.ir_node.into_inner()),
1932 metadata: self
1933 .location
1934 .tick
1935 .l
1936 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1937 },
1938 )
1939 }
1940}
1941
1942impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1943where
1944 L: Location<'a>,
1945{
1946 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1947 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1948 /// is only present in one of the inputs, its values are passed through as-is). The output has
1949 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1950 ///
1951 /// Currently, both input streams must be [`Bounded`]. This operator will block
1952 /// on the first stream until all its elements are available. In a future version,
1953 /// we will relax the requirement on the `other` stream.
1954 ///
1955 /// # Example
1956 /// ```rust
1957 /// # use hydro_lang::prelude::*;
1958 /// # use futures::StreamExt;
1959 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1960 /// let tick = process.tick();
1961 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1962 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1963 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1964 /// # .entries()
1965 /// # }, |mut stream| async move {
1966 /// // { 0: [2, 1], 1: [4, 3] }
1967 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1968 /// # assert_eq!(stream.next().await.unwrap(), w);
1969 /// # }
1970 /// # }));
1971 /// ```
1972 pub fn chain<O2: Ordering, R2: Retries>(
1973 self,
1974 other: KeyedStream<K, V, L, Bounded, O2, R2>,
1975 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1976 where
1977 O: MinOrder<O2>,
1978 R: MinRetries<R2>,
1979 {
1980 check_matching_location(&self.location, &other.location);
1981
1982 KeyedStream::new(
1983 self.location.clone(),
1984 HydroNode::Chain {
1985 first: Box::new(self.ir_node.into_inner()),
1986 second: Box::new(other.ir_node.into_inner()),
1987 metadata: self.location.new_node_metadata(KeyedStream::<
1988 K,
1989 V,
1990 L,
1991 Bounded,
1992 <O as MinOrder<O2>>::Min,
1993 <R as MinRetries<R2>>::Min,
1994 >::collection_kind()),
1995 },
1996 )
1997 }
1998}
1999
2000impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2001where
2002 L: Location<'a>,
2003{
2004 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2005 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2006 /// each key.
2007 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2008 KeyedStream::new(
2009 self.location.outer().clone(),
2010 HydroNode::YieldConcat {
2011 inner: Box::new(self.ir_node.into_inner()),
2012 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2013 K,
2014 V,
2015 L,
2016 Unbounded,
2017 O,
2018 R,
2019 >::collection_kind(
2020 )),
2021 },
2022 )
2023 }
2024
2025 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2026 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2027 /// each key.
2028 ///
2029 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2030 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2031 /// stream's [`Tick`] context.
2032 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2033 let out_location = Atomic {
2034 tick: self.location.clone(),
2035 };
2036
2037 KeyedStream::new(
2038 out_location.clone(),
2039 HydroNode::YieldConcat {
2040 inner: Box::new(self.ir_node.into_inner()),
2041 metadata: out_location.new_node_metadata(KeyedStream::<
2042 K,
2043 V,
2044 Atomic<L>,
2045 Unbounded,
2046 O,
2047 R,
2048 >::collection_kind()),
2049 },
2050 )
2051 }
2052
2053 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2054 /// tick `T` always has the entries of `self` at tick `T - 1`.
2055 ///
2056 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2057 ///
2058 /// This operator enables stateful iterative processing with ticks, by sending data from one
2059 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2060 ///
2061 /// # Example
2062 /// ```rust
2063 /// # use hydro_lang::prelude::*;
2064 /// # use futures::StreamExt;
2065 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2066 /// let tick = process.tick();
2067 /// # // ticks are lazy by default, forces the second tick to run
2068 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2069 /// # let batch_first_tick = process
2070 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2071 /// # .batch(&tick, nondet!(/** test */))
2072 /// # .into_keyed();
2073 /// # let batch_second_tick = process
2074 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2075 /// # .batch(&tick, nondet!(/** test */))
2076 /// # .defer_tick()
2077 /// # .into_keyed(); // appears on the second tick
2078 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2079 /// # batch_first_tick.chain(batch_second_tick);
2080 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2081 /// changes_across_ticks // from the current tick
2082 /// )
2083 /// # .entries().all_ticks()
2084 /// # }, |mut stream| async move {
2085 /// // { 1: [2, 3] } (first tick), { 1: [2, 3, 4], 2: [5] } (second tick), { 1: [4], 2: [5] } (third tick)
2086 /// # for w in vec![(1, 2), (1, 3), (1, 2), (1, 3), (1, 4), (2, 5), (1, 4), (2, 5)] {
2087 /// # assert_eq!(stream.next().await.unwrap(), w);
2088 /// # }
2089 /// # }));
2090 /// ```
2091 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2092 KeyedStream::new(
2093 self.location.clone(),
2094 HydroNode::DeferTick {
2095 input: Box::new(self.ir_node.into_inner()),
2096 metadata: self.location.new_node_metadata(KeyedStream::<
2097 K,
2098 V,
2099 Tick<L>,
2100 Bounded,
2101 O,
2102 R,
2103 >::collection_kind()),
2104 },
2105 )
2106 }
2107}
2108
2109#[cfg(test)]
2110mod tests {
2111 #[cfg(feature = "deploy")]
2112 use futures::{SinkExt, StreamExt};
2113 #[cfg(feature = "deploy")]
2114 use hydro_deploy::Deployment;
2115 use stageleft::q;
2116
2117 use crate::compile::builder::FlowBuilder;
2118 #[cfg(feature = "deploy")]
2119 use crate::live_collections::stream::ExactlyOnce;
2120 use crate::location::Location;
2121 use crate::nondet::nondet;
2122
2123 #[cfg(feature = "deploy")]
2124 #[tokio::test]
2125 async fn reduce_watermark_filter() {
2126 let mut deployment = Deployment::new();
2127
2128 let flow = FlowBuilder::new();
2129 let node = flow.process::<()>();
2130 let external = flow.external::<()>();
2131
2132 let node_tick = node.tick();
2133 let watermark = node_tick.singleton(q!(1));
2134
2135 let sum = node
2136 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2137 .into_keyed()
2138 .reduce_watermark(
2139 watermark,
2140 q!(|acc, v| {
2141 *acc += v;
2142 }),
2143 )
2144 .snapshot(&node_tick, nondet!(/** test */))
2145 .entries()
2146 .all_ticks()
2147 .send_bincode_external(&external);
2148
2149 let nodes = flow
2150 .with_process(&node, deployment.Localhost())
2151 .with_external(&external, deployment.Localhost())
2152 .deploy(&mut deployment);
2153
2154 deployment.deploy().await.unwrap();
2155
2156 let mut out = nodes.connect(sum).await;
2157
2158 deployment.start().await.unwrap();
2159
2160 assert_eq!(out.next().await.unwrap(), (2, 204));
2161 }
2162
2163 #[cfg(feature = "deploy")]
2164 #[tokio::test]
2165 async fn reduce_watermark_garbage_collect() {
2166 let mut deployment = Deployment::new();
2167
2168 let flow = FlowBuilder::new();
2169 let node = flow.process::<()>();
2170 let external = flow.external::<()>();
2171 let (tick_send, tick_trigger) =
2172 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2173
2174 let node_tick = node.tick();
2175 let (watermark_complete_cycle, watermark) =
2176 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2177 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2178 watermark_complete_cycle.complete_next_tick(next_watermark);
2179
2180 let tick_triggered_input = node
2181 .source_iter(q!([(3, 103)]))
2182 .batch(&node_tick, nondet!(/** test */))
2183 .filter_if_some(
2184 tick_trigger
2185 .clone()
2186 .batch(&node_tick, nondet!(/** test */))
2187 .first(),
2188 )
2189 .all_ticks();
2190
2191 let sum = node
2192 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2193 .interleave(tick_triggered_input)
2194 .into_keyed()
2195 .reduce_watermark_commutative(
2196 watermark,
2197 q!(|acc, v| {
2198 *acc += v;
2199 }),
2200 )
2201 .snapshot(&node_tick, nondet!(/** test */))
2202 .entries()
2203 .all_ticks()
2204 .send_bincode_external(&external);
2205
2206 let nodes = flow
2207 .with_default_optimize()
2208 .with_process(&node, deployment.Localhost())
2209 .with_external(&external, deployment.Localhost())
2210 .deploy(&mut deployment);
2211
2212 deployment.deploy().await.unwrap();
2213
2214 let mut tick_send = nodes.connect(tick_send).await;
2215 let mut out_recv = nodes.connect(sum).await;
2216
2217 deployment.start().await.unwrap();
2218
2219 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2220
2221 tick_send.send(()).await.unwrap();
2222
2223 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2224 }
2225
2226 #[test]
2227 #[should_panic]
2228 fn sim_batch_nondet_size() {
2229 let flow = FlowBuilder::new();
2230 let external = flow.external::<()>();
2231 let node = flow.process::<()>();
2232
2233 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2234
2235 let tick = node.tick();
2236 let out_port = input
2237 .batch(&tick, nondet!(/** test */))
2238 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2239 .entries()
2240 .all_ticks()
2241 .send_bincode_external(&external);
2242
2243 flow.sim().exhaustive(async |mut compiled| {
2244 let out_recv = compiled.connect(&out_port);
2245 compiled.launch();
2246
2247 out_recv
2248 .assert_yields_only_unordered([(1, vec![1, 2])])
2249 .await;
2250 });
2251 }
2252
2253 #[test]
2254 fn sim_batch_preserves_group_order() {
2255 let flow = FlowBuilder::new();
2256 let external = flow.external::<()>();
2257 let node = flow.process::<()>();
2258
2259 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2260
2261 let tick = node.tick();
2262 let out_port = input
2263 .batch(&tick, nondet!(/** test */))
2264 .all_ticks()
2265 .fold_early_stop(
2266 q!(|| 0),
2267 q!(|acc, v| {
2268 *acc = std::cmp::max(v, *acc);
2269 *acc >= 2
2270 }),
2271 )
2272 .entries()
2273 .send_bincode_external(&external);
2274
2275 let instances = flow.sim().exhaustive(async |mut compiled| {
2276 let out_recv = compiled.connect(&out_port);
2277 compiled.launch();
2278
2279 out_recv
2280 .assert_yields_only_unordered([(1, 2), (2, 3)])
2281 .await;
2282 });
2283
2284 assert_eq!(instances, 8);
2285 // - three cases: all three in a separate tick (pick where (2, 3) is)
2286 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2287 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2288 // - one case: all three together
2289 }
2290
2291 #[test]
2292 fn sim_batch_unordered_shuffles() {
2293 let flow = FlowBuilder::new();
2294 let external = flow.external::<()>();
2295 let node = flow.process::<()>();
2296
2297 let input = node
2298 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2299 .into_keyed()
2300 .weakest_ordering();
2301
2302 let tick = node.tick();
2303 let out_port = input
2304 .batch(&tick, nondet!(/** test */))
2305 .all_ticks()
2306 .entries()
2307 .send_bincode_external(&external);
2308
2309 let instances = flow.sim().exhaustive(async |mut compiled| {
2310 let out_recv = compiled.connect(&out_port);
2311 compiled.launch();
2312
2313 out_recv
2314 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2315 .await;
2316 });
2317
2318 assert_eq!(instances, 13);
2319 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2320 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2321 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2322 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2323 }
2324}