hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::Debug;
6use std::marker::PhantomData;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11
12use bytes::Bytes;
13use colored::Colorize;
14use dfir_rs::scheduled::graph::Dfir;
15use futures::{FutureExt, Stream, StreamExt};
16use libloading::Library;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use tempfile::TempPath;
20use tokio::sync::mpsc::UnboundedSender;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22
23use super::runtime::SimHook;
24use crate::compile::deploy::ConnectableAsync;
25use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
26use crate::location::dynamic::LocationId;
27use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
28
29/// A handle to a compiled Hydro simulation, which can be instantiated and run.
30pub struct CompiledSim {
31    pub(super) _path: TempPath,
32    pub(super) lib: Library,
33    pub(super) external_ports: Vec<usize>,
34    pub(super) external_registered: HashMap<usize, usize>,
35}
36
37#[sealed::sealed]
38/// A trait implemented by closures that can instantiate a compiled simulation.
39///
40/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
41pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
42#[sealed::sealed]
43impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
44
45fn null_handler(_args: fmt::Arguments) {}
46
47fn println_handler(args: fmt::Arguments) {
48    println!("{}", args);
49}
50
51fn eprintln_handler(args: fmt::Arguments) {
52    eprintln!("{}", args);
53}
54
55type SimLoaded<'a> = libloading::Symbol<
56    'a,
57    unsafe extern "Rust" fn(
58        bool,
59        HashMap<usize, UnboundedSender<Bytes>>,
60        HashMap<usize, UnboundedReceiverStream<Bytes>>,
61        fn(fmt::Arguments<'_>),
62        fn(fmt::Arguments<'_>),
63    ) -> (
64        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
65        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
66        HashMap<(&'static str, Option<u32>), Vec<Box<dyn SimHook>>>,
67    ),
68>;
69
70impl CompiledSim {
71    /// Executes the given closure with a single instance of the compiled simulation.
72    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
73        self.with_instantiator(|instantiator| thunk(instantiator()), true)
74    }
75
76    /// Executes the given closure with an [`Instantiator`], which can be called to create
77    /// independent instances of the simulation. This is useful for fuzzing, where we need to
78    /// re-execute the simulation several times with different decisions.
79    ///
80    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
81    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
82    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
83    pub fn with_instantiator<T>(
84        &self,
85        thunk: impl FnOnce(&dyn Instantiator) -> T,
86        always_log: bool,
87    ) -> T {
88        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
89        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
90        thunk(
91            &(|| CompiledSimInstance {
92                func: func.clone(),
93                remaining_ports: self.external_ports.iter().cloned().collect(),
94                external_registered: self.external_registered.clone(),
95                input_ports: HashMap::new(),
96                output_ports: HashMap::new(),
97                log,
98            }),
99        )
100    }
101
102    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
103    /// closure will be repeatedly executed with instances of the Hydro program where the
104    /// batching boundaries, order of messages, and retries are varied.
105    ///
106    /// During development, you should run the test that invokes this function with the `cargo sim`
107    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
108    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
109    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
110    /// be executed, and if no reproducer is found a small number of random executions will be
111    /// performed.
112    pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
113        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
114            .elements()
115            .into_iter()
116            .find(|e| {
117                !e.fn_name.starts_with("hydro_lang::sim::compiled")
118                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
119                    && !e.fn_name.starts_with("fuzz<")
120            })
121            .unwrap();
122
123        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
124        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
125
126        let caller_fuzz_repro_path = repro_folder
127            .join(caller_fn.fn_name.replace("::", "__"))
128            .with_extension("bin");
129
130        if std::env::var("BOLERO_FUZZER").is_ok() {
131            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
132            std::fs::create_dir_all(&corpus_dir).unwrap();
133            let libfuzzer_args = format!(
134                "{} {} -artifact_prefix={}/ -handle_abrt=0",
135                corpus_dir.to_str().unwrap(),
136                corpus_dir.to_str().unwrap(),
137                corpus_dir.to_str().unwrap(),
138            );
139
140            std::fs::create_dir_all(&repro_folder).unwrap();
141
142            unsafe {
143                std::env::set_var(
144                    "BOLERO_FAILURE_OUTPUT",
145                    caller_fuzz_repro_path.to_str().unwrap(),
146                );
147
148                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
149            }
150
151            self.with_instantiator(
152                |instantiator| {
153                    bolero::test(bolero::TargetLocation {
154                        package_name: "",
155                        manifest_dir: "",
156                        module_path: "",
157                        file: "",
158                        line: 0,
159                        item_path: "<unknown>::__bolero_item_path__",
160                        test_name: None,
161                    })
162                    .run_with_replay(move |is_replay| {
163                        let mut instance = instantiator();
164
165                        if instance.log {
166                            eprintln!(
167                                "{}",
168                                "\n==== New Simulation Instance ===="
169                                    .color(colored::Color::Cyan)
170                                    .bold()
171                            );
172                        }
173
174                        if is_replay {
175                            instance.log = true;
176                        }
177
178                        tokio::runtime::Builder::new_current_thread()
179                            .build()
180                            .unwrap()
181                            .block_on(async {
182                                let local_set = tokio::task::LocalSet::new();
183                                local_set.run_until(thunk(instance)).await
184                            })
185                    })
186                },
187                false,
188            );
189        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
190            self.fuzz_repro(existing_bytes, thunk);
191        } else {
192            eprintln!(
193                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
194                caller_fuzz_repro_path.display()
195            );
196            self.with_instantiator(
197                |instantiator| {
198                    bolero::test(bolero::TargetLocation {
199                        package_name: "",
200                        manifest_dir: "",
201                        module_path: "",
202                        file: ".",
203                        line: 0,
204                        item_path: "<unknown>::__bolero_item_path__",
205                        test_name: None,
206                    })
207                    .with_iterations(8192)
208                    .run(move || {
209                        let instance = instantiator();
210                        tokio::runtime::Builder::new_current_thread()
211                            .build()
212                            .unwrap()
213                            .block_on(async {
214                                let local_set = tokio::task::LocalSet::new();
215                                local_set.run_until(thunk(instance)).await
216                            })
217                    })
218                },
219                false,
220            );
221        }
222    }
223
224    /// Executes the given closure with a single instance of the compiled simulation, using the
225    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
226    /// failure found during fuzzing.
227    pub fn fuzz_repro<'a>(
228        &'a self,
229        bytes: Vec<u8>,
230        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
231    ) {
232        self.with_instance(|instance| {
233            bolero::bolero_engine::any::scope::with(
234                Box::new(bolero::bolero_engine::driver::object::Object(
235                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
236                )),
237                || {
238                    tokio::runtime::Builder::new_current_thread()
239                        .build()
240                        .unwrap()
241                        .block_on(async {
242                            let local_set = tokio::task::LocalSet::new();
243                            local_set.run_until(thunk(instance)).await
244                        })
245                },
246            )
247        });
248    }
249
250    /// Exhaustively searches all possible executions of the simulation. The provided
251    /// closure will be repeatedly executed with instances of the Hydro program where the
252    /// batching boundaries, order of messages, and retries are varied.
253    ///
254    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
255    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
256    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
257    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
258    ///
259    /// Returns the number of distinct executions explored.
260    pub fn exhaustive<'a>(
261        &'a self,
262        mut thunk: impl AsyncFnMut(CompiledSimInstance) + RefUnwindSafe,
263    ) -> usize {
264        if std::env::var("BOLERO_FUZZER").is_ok() {
265            eprintln!(
266                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
267            );
268            std::process::abort();
269        }
270
271        let mut count = 0;
272        let count_mut = &mut count;
273
274        self.with_instantiator(
275            |instantiator| {
276                bolero::test(bolero::TargetLocation {
277                    package_name: "",
278                    manifest_dir: "",
279                    module_path: "",
280                    file: "",
281                    line: 0,
282                    item_path: "<unknown>::__bolero_item_path__",
283                    test_name: None,
284                })
285                .exhaustive()
286                .run_with_replay(move |is_replay| {
287                    *count_mut += 1;
288
289                    let mut instance = instantiator();
290                    if instance.log {
291                        eprintln!(
292                            "{}",
293                            "\n==== New Simulation Instance ===="
294                                .color(colored::Color::Cyan)
295                                .bold()
296                        );
297                    }
298
299                    if is_replay {
300                        instance.log = true;
301                    }
302
303                    tokio::runtime::Builder::new_current_thread()
304                        .build()
305                        .unwrap()
306                        .block_on(async {
307                            let local_set = tokio::task::LocalSet::new();
308                            local_set.run_until(thunk(instance)).await;
309                        })
310                })
311            },
312            false,
313        );
314
315        count
316    }
317}
318
319/// A single instance of a compiled Hydro simulation, which provides methods to interactively
320/// execute the simulation, feed inputs, and receive outputs.
321pub struct CompiledSimInstance<'a> {
322    func: SimLoaded<'a>,
323    remaining_ports: HashSet<usize>,
324    external_registered: HashMap<usize, usize>,
325    output_ports: HashMap<usize, UnboundedSender<Bytes>>,
326    input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
327    log: bool,
328}
329
330impl<'a> CompiledSimInstance<'a> {
331    #[deprecated(note = "Use `connect` instead")]
332    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
333    /// given input port, and returns a closure that can be used to send messages to it.
334    pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
335        &mut self,
336        port: &ExternalBincodeSink<T, M, O, R>,
337    ) -> SimSender<T, O, R> {
338        self.connect(port)
339    }
340
341    #[deprecated(note = "Use `connect` instead")]
342    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
343    /// given output port, and returns a stream that can be used to receive messages from it.
344    pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
345        &mut self,
346        port: &ExternalBincodeStream<T, O, R>,
347    ) -> SimReceiver<'a, T, O, R> {
348        self.connect(port)
349    }
350
351    /// Establishes a connection to the given input or output port, returning either a
352    /// [`SimSender`] (for input ports) or a stream (for output ports). This should be invoked
353    /// before calling [`Self::launch`], and should only be invoked once per port.
354    pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
355        &'b mut self,
356        port: P,
357    ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
358        let mut pinned = std::pin::pin!(port.connect(self));
359        if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
360            v
361        } else {
362            panic!("Connect impl should not have used any async operations");
363        }
364    }
365
366    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
367    /// be invoked after connecting all inputs and outputs, but before receiving any messages.
368    pub fn launch(self) {
369        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
370    }
371
372    /// Returns a future that schedules simulation with the given logger for reporting the
373    /// simulation trace.
374    ///
375    /// See [`Self::launch`] for more details.
376    pub fn schedule_with_logger<W: std::io::Write>(
377        self,
378        log_writer: W,
379    ) -> impl use<W> + Future<Output = ()> {
380        self.schedule_with_maybe_logger(Some(log_writer))
381    }
382
383    fn schedule_with_maybe_logger<W: std::io::Write>(
384        mut self,
385        log_override: Option<W>,
386    ) -> impl use<W> + Future<Output = ()> {
387        for remaining in self.remaining_ports {
388            let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
389            self.output_ports.insert(remaining, sender);
390            self.input_ports.insert(remaining, receiver);
391        }
392
393        let (async_dfirs, tick_dfirs, hooks) = unsafe {
394            (self.func)(
395                colored::control::SHOULD_COLORIZE.should_colorize(),
396                self.output_ports,
397                self.input_ports,
398                if self.log {
399                    println_handler
400                } else {
401                    null_handler
402                },
403                if self.log {
404                    eprintln_handler
405                } else {
406                    null_handler
407                },
408            )
409        };
410        let mut launched = LaunchedSim {
411            async_dfirs: async_dfirs
412                .into_iter()
413                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
414                .collect(),
415            possibly_ready_ticks: vec![],
416            not_ready_ticks: tick_dfirs
417                .into_iter()
418                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
419                .collect(),
420            hooks: hooks
421                .into_iter()
422                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
423                .collect(),
424            log: if self.log {
425                if let Some(w) = log_override {
426                    LogKind::Custom(w)
427                } else {
428                    LogKind::Stderr
429                }
430            } else {
431                LogKind::Null
432            },
433        };
434
435        async move { launched.scheduler().await }
436    }
437}
438
439/// A receiver for an external bincode stream in a simulation.
440pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
441    Pin<Box<dyn Stream<Item = T> + 'a>>,
442    PhantomData<(O, R)>,
443);
444
445impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
446    /// Asserts that the stream has ended and no more messages can possibly arrive.
447    pub async fn assert_no_more(mut self)
448    where
449        T: Debug,
450    {
451        if let Some(next) = self.0.next().await {
452            panic!("Stream yielded unexpected message: {:?}", next);
453        }
454    }
455}
456
457impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
458    /// Receives the next message from the external bincode stream. This will wait until a message
459    /// is available, or return `None` if no more messages can possibly arrive.
460    pub async fn next(&mut self) -> Option<T> {
461        self.0.next().await
462    }
463
464    /// Collects all remaining messages from the external bincode stream into a collection. This
465    /// will wait until no more messages can possibly arrive.
466    pub async fn collect<C: Default + Extend<T>>(self) -> C {
467        self.0.collect().await
468    }
469
470    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
471    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
472    pub async fn assert_yields<T2: Debug>(&mut self, expected: impl IntoIterator<Item = T2>)
473    where
474        T: Debug + PartialEq<T2>,
475    {
476        let mut expected: VecDeque<T2> = expected.into_iter().collect();
477
478        while !expected.is_empty() {
479            if let Some(next) = self.next().await {
480                assert_eq!(next, expected.pop_front().unwrap());
481            } else {
482                panic!("Stream ended early, still expected: {:?}", expected);
483            }
484        }
485    }
486
487    /// Asserts that the stream yields only the expected sequence of messages, in order,
488    /// and then ends.
489    pub async fn assert_yields_only<T2: Debug>(mut self, expected: impl IntoIterator<Item = T2>)
490    where
491        T: Debug + PartialEq<T2>,
492    {
493        self.assert_yields(expected).await;
494        self.assert_no_more().await;
495    }
496}
497
498impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
499    /// Collects all remaining messages from the external bincode stream into a collection,
500    /// sorting them. This will wait until no more messages can possibly arrive.
501    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
502    where
503        T: Ord,
504    {
505        let mut collected: C = self.0.collect().await;
506        collected.as_mut().sort();
507        collected
508    }
509
510    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
511    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
512    pub async fn assert_yields_unordered<T2: Debug>(
513        &mut self,
514        expected: impl IntoIterator<Item = T2>,
515    ) where
516        T: Debug + PartialEq<T2>,
517    {
518        let mut expected: Vec<T2> = expected.into_iter().collect();
519
520        while !expected.is_empty() {
521            if let Some(next) = self.0.next().await {
522                let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
523                if let Some((i, _)) = idx {
524                    expected.swap_remove(i);
525                } else {
526                    panic!("Stream yielded unexpected message: {:?}", next);
527                }
528            } else {
529                panic!("Stream ended early, still expected: {:?}", expected);
530            }
531        }
532    }
533
534    /// Asserts that the stream yields only the expected sequence of messages, in some order,
535    /// and then ends.
536    pub async fn assert_yields_only_unordered<T2: Debug>(
537        mut self,
538        expected: impl IntoIterator<Item = T2>,
539    ) where
540        T: Debug + PartialEq<T2>,
541    {
542        self.assert_yields_unordered(expected).await;
543        self.assert_no_more().await;
544    }
545}
546
547impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
548    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
549{
550    type Output = SimReceiver<'a, T, O, R>;
551
552    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
553        let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
554
555        assert!(ctx.remaining_ports.remove(looked_up));
556        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
557        ctx.output_ports.insert(*looked_up, sink);
558
559        SimReceiver(
560            Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
561            PhantomData,
562        )
563    }
564}
565
566/// A sender to an external bincode sink in a simulation.
567pub struct SimSender<T, O: Ordering, R: Retries>(
568    Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
569    PhantomData<(O, R)>,
570);
571impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
572    /// Sends a message to the external bincode sink. The message will be asynchronously processed
573    /// as part of the simulation.
574    pub fn send(&self, t: T) {
575        (self.0)(t).unwrap()
576    }
577
578    /// Sends several messages to the external bincode sink. The messages will be asynchronously
579    /// processed as part of the simulation.
580    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
581        for t in iter {
582            (self.0)(t).unwrap();
583        }
584    }
585}
586
587impl<T> SimSender<T, NoOrder, ExactlyOnce> {
588    /// Sends several messages to the external bincode sink. The messages will be asynchronously
589    /// processed as part of the simulation, in non-determinstic order.
590    pub fn send_many_unordered<I: IntoIterator<Item = T>>(
591        &self,
592        iter: I,
593    ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
594        for t in iter {
595            (self.0)(t)?;
596        }
597        Ok(())
598    }
599}
600
601impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
602    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
603{
604    type Output = SimSender<T, O, R>;
605
606    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
607        let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
608
609        assert!(ctx.remaining_ports.remove(looked_up));
610        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
611        ctx.input_ports.insert(*looked_up, source);
612        SimSender(
613            Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
614            PhantomData,
615        )
616    }
617}
618
619enum LogKind<W: std::io::Write> {
620    Null,
621    Stderr,
622    Custom(W),
623}
624
625// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
626impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
627    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
628        match self {
629            LogKind::Null => Ok(()),
630            LogKind::Stderr => {
631                eprint!("{}", s);
632                Ok(())
633            }
634            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
635        }
636    }
637}
638
639type Hooks = HashMap<(LocationId, Option<u32>), Vec<Box<dyn SimHook>>>;
640
641/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
642/// about scheduling ticks and choices for non-deterministic operators like batch.
643struct LaunchedSim<W: std::io::Write> {
644    async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
645    possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
646    not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
647    hooks: Hooks,
648    log: LogKind<W>,
649}
650
651impl<W: std::io::Write> LaunchedSim<W> {
652    async fn scheduler(&mut self) {
653        loop {
654            tokio::task::yield_now().await;
655            let mut any_made_progress = false;
656            for (loc, c_id, dfir) in &mut self.async_dfirs {
657                if dfir.run_tick().await {
658                    any_made_progress = true;
659                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
660                        .not_ready_ticks
661                        .drain(..)
662                        .partition(|(tick_loc, tick_c_id, _)| {
663                            let LocationId::Tick(_, outer) = tick_loc else {
664                                unreachable!()
665                            };
666                            outer.as_ref() == loc && tick_c_id == c_id
667                        });
668
669                    self.possibly_ready_ticks.extend(now_ready);
670                    self.not_ready_ticks.extend(still_not_ready);
671                }
672            }
673
674            if any_made_progress {
675                continue;
676            } else {
677                use bolero::generator::*;
678
679                let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
680                    .possibly_ready_ticks
681                    .drain(..)
682                    .partition(|(name, cid, _)| {
683                        self.hooks
684                            .get(&(name.clone(), *cid))
685                            .unwrap()
686                            .iter()
687                            .any(|hook| {
688                                hook.current_decision().unwrap_or(false)
689                                    || hook.can_make_nontrivial_decision()
690                            })
691                    });
692
693                self.possibly_ready_ticks = ready;
694                self.not_ready_ticks.append(&mut not_ready);
695
696                if self.possibly_ready_ticks.is_empty() {
697                    break;
698                } else {
699                    let next_tick = (0..self.possibly_ready_ticks.len()).any();
700                    let mut removed = self.possibly_ready_ticks.remove(next_tick);
701
702                    match &mut self.log {
703                        LogKind::Null => {}
704                        LogKind::Stderr => {
705                            if let Some(cid) = &removed.1 {
706                                eprintln!(
707                                    "\n{}",
708                                    format!("Running Tick (Cluster Member {})", cid)
709                                        .color(colored::Color::Magenta)
710                                        .bold()
711                                )
712                            } else {
713                                eprintln!(
714                                    "\n{}",
715                                    "Running Tick".color(colored::Color::Magenta).bold()
716                                )
717                            }
718                        }
719                        LogKind::Custom(writer) => {
720                            writeln!(
721                                writer,
722                                "\n{}",
723                                "Running Tick".color(colored::Color::Magenta).bold()
724                            )
725                            .unwrap();
726                        }
727                    }
728
729                    let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
730                        write.write_str(&"*".color(colored::Color::Magenta).bold())?;
731                        write.write_str(" ")
732                    };
733
734                    let mut tick_decision_writer =
735                        indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
736                            inserter: &mut asterisk_indenter,
737                        });
738
739                    let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
740                    let mut remaining_decision_count = hooks.len();
741                    let mut made_nontrivial_decision = false;
742
743                    bolero_generator::any::scope::borrow_with(|driver| {
744                        // first, scan manual decisions
745                        hooks.iter_mut().for_each(|hook| {
746                            if let Some(is_nontrivial) = hook.current_decision() {
747                                made_nontrivial_decision |= is_nontrivial;
748                                remaining_decision_count -= 1;
749                            } else if !hook.can_make_nontrivial_decision() {
750                                // if no nontrivial decision is possible, make a trivial one
751                                // (we need to do this in the first pass to force nontrivial decisions
752                                // on the remaining hooks)
753                                hook.autonomous_decision(driver, false);
754                                remaining_decision_count -= 1;
755                            }
756                        });
757
758                        hooks.iter_mut().for_each(|hook| {
759                            if hook.current_decision().is_none() {
760                                made_nontrivial_decision |= hook.autonomous_decision(
761                                    driver,
762                                    !made_nontrivial_decision && remaining_decision_count == 1,
763                                );
764                                remaining_decision_count -= 1;
765                            }
766
767                            hook.release_decision(&mut tick_decision_writer);
768                        });
769                    });
770
771                    assert!(removed.2.run_tick().await);
772                    self.possibly_ready_ticks.push(removed);
773                }
774            }
775        }
776    }
777}