1use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::Debug;
6use std::marker::PhantomData;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11
12use bytes::Bytes;
13use colored::Colorize;
14use dfir_rs::scheduled::graph::Dfir;
15use futures::{FutureExt, Stream, StreamExt};
16use libloading::Library;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use tempfile::TempPath;
20use tokio::sync::mpsc::UnboundedSender;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22
23use super::runtime::SimHook;
24use crate::compile::deploy::ConnectableAsync;
25use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
26use crate::location::dynamic::LocationId;
27use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
28
29pub struct CompiledSim {
31 pub(super) _path: TempPath,
32 pub(super) lib: Library,
33 pub(super) external_ports: Vec<usize>,
34 pub(super) external_registered: HashMap<usize, usize>,
35}
36
37#[sealed::sealed]
38pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
42#[sealed::sealed]
43impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
44
45fn null_handler(_args: fmt::Arguments) {}
46
47fn println_handler(args: fmt::Arguments) {
48 println!("{}", args);
49}
50
51fn eprintln_handler(args: fmt::Arguments) {
52 eprintln!("{}", args);
53}
54
55type SimLoaded<'a> = libloading::Symbol<
56 'a,
57 unsafe extern "Rust" fn(
58 bool,
59 HashMap<usize, UnboundedSender<Bytes>>,
60 HashMap<usize, UnboundedReceiverStream<Bytes>>,
61 fn(fmt::Arguments<'_>),
62 fn(fmt::Arguments<'_>),
63 ) -> (
64 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
65 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
66 HashMap<(&'static str, Option<u32>), Vec<Box<dyn SimHook>>>,
67 ),
68>;
69
70impl CompiledSim {
71 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
73 self.with_instantiator(|instantiator| thunk(instantiator()), true)
74 }
75
76 pub fn with_instantiator<T>(
84 &self,
85 thunk: impl FnOnce(&dyn Instantiator) -> T,
86 always_log: bool,
87 ) -> T {
88 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
89 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
90 thunk(
91 &(|| CompiledSimInstance {
92 func: func.clone(),
93 remaining_ports: self.external_ports.iter().cloned().collect(),
94 external_registered: self.external_registered.clone(),
95 input_ports: HashMap::new(),
96 output_ports: HashMap::new(),
97 log,
98 }),
99 )
100 }
101
102 pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
113 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
114 .elements()
115 .into_iter()
116 .find(|e| {
117 !e.fn_name.starts_with("hydro_lang::sim::compiled")
118 && !e.fn_name.starts_with("hydro_lang::sim::flow")
119 && !e.fn_name.starts_with("fuzz<")
120 })
121 .unwrap();
122
123 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
124 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
125
126 let caller_fuzz_repro_path = repro_folder
127 .join(caller_fn.fn_name.replace("::", "__"))
128 .with_extension("bin");
129
130 if std::env::var("BOLERO_FUZZER").is_ok() {
131 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
132 std::fs::create_dir_all(&corpus_dir).unwrap();
133 let libfuzzer_args = format!(
134 "{} {} -artifact_prefix={}/ -handle_abrt=0",
135 corpus_dir.to_str().unwrap(),
136 corpus_dir.to_str().unwrap(),
137 corpus_dir.to_str().unwrap(),
138 );
139
140 std::fs::create_dir_all(&repro_folder).unwrap();
141
142 unsafe {
143 std::env::set_var(
144 "BOLERO_FAILURE_OUTPUT",
145 caller_fuzz_repro_path.to_str().unwrap(),
146 );
147
148 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
149 }
150
151 self.with_instantiator(
152 |instantiator| {
153 bolero::test(bolero::TargetLocation {
154 package_name: "",
155 manifest_dir: "",
156 module_path: "",
157 file: "",
158 line: 0,
159 item_path: "<unknown>::__bolero_item_path__",
160 test_name: None,
161 })
162 .run_with_replay(move |is_replay| {
163 let mut instance = instantiator();
164
165 if instance.log {
166 eprintln!(
167 "{}",
168 "\n==== New Simulation Instance ===="
169 .color(colored::Color::Cyan)
170 .bold()
171 );
172 }
173
174 if is_replay {
175 instance.log = true;
176 }
177
178 tokio::runtime::Builder::new_current_thread()
179 .build()
180 .unwrap()
181 .block_on(async {
182 let local_set = tokio::task::LocalSet::new();
183 local_set.run_until(thunk(instance)).await
184 })
185 })
186 },
187 false,
188 );
189 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
190 self.fuzz_repro(existing_bytes, thunk);
191 } else {
192 eprintln!(
193 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
194 caller_fuzz_repro_path.display()
195 );
196 self.with_instantiator(
197 |instantiator| {
198 bolero::test(bolero::TargetLocation {
199 package_name: "",
200 manifest_dir: "",
201 module_path: "",
202 file: ".",
203 line: 0,
204 item_path: "<unknown>::__bolero_item_path__",
205 test_name: None,
206 })
207 .with_iterations(8192)
208 .run(move || {
209 let instance = instantiator();
210 tokio::runtime::Builder::new_current_thread()
211 .build()
212 .unwrap()
213 .block_on(async {
214 let local_set = tokio::task::LocalSet::new();
215 local_set.run_until(thunk(instance)).await
216 })
217 })
218 },
219 false,
220 );
221 }
222 }
223
224 pub fn fuzz_repro<'a>(
228 &'a self,
229 bytes: Vec<u8>,
230 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
231 ) {
232 self.with_instance(|instance| {
233 bolero::bolero_engine::any::scope::with(
234 Box::new(bolero::bolero_engine::driver::object::Object(
235 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
236 )),
237 || {
238 tokio::runtime::Builder::new_current_thread()
239 .build()
240 .unwrap()
241 .block_on(async {
242 let local_set = tokio::task::LocalSet::new();
243 local_set.run_until(thunk(instance)).await
244 })
245 },
246 )
247 });
248 }
249
250 pub fn exhaustive<'a>(
261 &'a self,
262 mut thunk: impl AsyncFnMut(CompiledSimInstance) + RefUnwindSafe,
263 ) -> usize {
264 if std::env::var("BOLERO_FUZZER").is_ok() {
265 eprintln!(
266 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
267 );
268 std::process::abort();
269 }
270
271 let mut count = 0;
272 let count_mut = &mut count;
273
274 self.with_instantiator(
275 |instantiator| {
276 bolero::test(bolero::TargetLocation {
277 package_name: "",
278 manifest_dir: "",
279 module_path: "",
280 file: "",
281 line: 0,
282 item_path: "<unknown>::__bolero_item_path__",
283 test_name: None,
284 })
285 .exhaustive()
286 .run_with_replay(move |is_replay| {
287 *count_mut += 1;
288
289 let mut instance = instantiator();
290 if instance.log {
291 eprintln!(
292 "{}",
293 "\n==== New Simulation Instance ===="
294 .color(colored::Color::Cyan)
295 .bold()
296 );
297 }
298
299 if is_replay {
300 instance.log = true;
301 }
302
303 tokio::runtime::Builder::new_current_thread()
304 .build()
305 .unwrap()
306 .block_on(async {
307 let local_set = tokio::task::LocalSet::new();
308 local_set.run_until(thunk(instance)).await;
309 })
310 })
311 },
312 false,
313 );
314
315 count
316 }
317}
318
319pub struct CompiledSimInstance<'a> {
322 func: SimLoaded<'a>,
323 remaining_ports: HashSet<usize>,
324 external_registered: HashMap<usize, usize>,
325 output_ports: HashMap<usize, UnboundedSender<Bytes>>,
326 input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
327 log: bool,
328}
329
330impl<'a> CompiledSimInstance<'a> {
331 #[deprecated(note = "Use `connect` instead")]
332 pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
335 &mut self,
336 port: &ExternalBincodeSink<T, M, O, R>,
337 ) -> SimSender<T, O, R> {
338 self.connect(port)
339 }
340
341 #[deprecated(note = "Use `connect` instead")]
342 pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
345 &mut self,
346 port: &ExternalBincodeStream<T, O, R>,
347 ) -> SimReceiver<'a, T, O, R> {
348 self.connect(port)
349 }
350
351 pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
355 &'b mut self,
356 port: P,
357 ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
358 let mut pinned = std::pin::pin!(port.connect(self));
359 if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
360 v
361 } else {
362 panic!("Connect impl should not have used any async operations");
363 }
364 }
365
366 pub fn launch(self) {
369 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
370 }
371
372 pub fn schedule_with_logger<W: std::io::Write>(
377 self,
378 log_writer: W,
379 ) -> impl use<W> + Future<Output = ()> {
380 self.schedule_with_maybe_logger(Some(log_writer))
381 }
382
383 fn schedule_with_maybe_logger<W: std::io::Write>(
384 mut self,
385 log_override: Option<W>,
386 ) -> impl use<W> + Future<Output = ()> {
387 for remaining in self.remaining_ports {
388 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
389 self.output_ports.insert(remaining, sender);
390 self.input_ports.insert(remaining, receiver);
391 }
392
393 let (async_dfirs, tick_dfirs, hooks) = unsafe {
394 (self.func)(
395 colored::control::SHOULD_COLORIZE.should_colorize(),
396 self.output_ports,
397 self.input_ports,
398 if self.log {
399 println_handler
400 } else {
401 null_handler
402 },
403 if self.log {
404 eprintln_handler
405 } else {
406 null_handler
407 },
408 )
409 };
410 let mut launched = LaunchedSim {
411 async_dfirs: async_dfirs
412 .into_iter()
413 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
414 .collect(),
415 possibly_ready_ticks: vec![],
416 not_ready_ticks: tick_dfirs
417 .into_iter()
418 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
419 .collect(),
420 hooks: hooks
421 .into_iter()
422 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
423 .collect(),
424 log: if self.log {
425 if let Some(w) = log_override {
426 LogKind::Custom(w)
427 } else {
428 LogKind::Stderr
429 }
430 } else {
431 LogKind::Null
432 },
433 };
434
435 async move { launched.scheduler().await }
436 }
437}
438
439pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
441 Pin<Box<dyn Stream<Item = T> + 'a>>,
442 PhantomData<(O, R)>,
443);
444
445impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
446 pub async fn assert_no_more(mut self)
448 where
449 T: Debug,
450 {
451 if let Some(next) = self.0.next().await {
452 panic!("Stream yielded unexpected message: {:?}", next);
453 }
454 }
455}
456
457impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
458 pub async fn next(&mut self) -> Option<T> {
461 self.0.next().await
462 }
463
464 pub async fn collect<C: Default + Extend<T>>(self) -> C {
467 self.0.collect().await
468 }
469
470 pub async fn assert_yields<T2: Debug>(&mut self, expected: impl IntoIterator<Item = T2>)
473 where
474 T: Debug + PartialEq<T2>,
475 {
476 let mut expected: VecDeque<T2> = expected.into_iter().collect();
477
478 while !expected.is_empty() {
479 if let Some(next) = self.next().await {
480 assert_eq!(next, expected.pop_front().unwrap());
481 } else {
482 panic!("Stream ended early, still expected: {:?}", expected);
483 }
484 }
485 }
486
487 pub async fn assert_yields_only<T2: Debug>(mut self, expected: impl IntoIterator<Item = T2>)
490 where
491 T: Debug + PartialEq<T2>,
492 {
493 self.assert_yields(expected).await;
494 self.assert_no_more().await;
495 }
496}
497
498impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
499 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
502 where
503 T: Ord,
504 {
505 let mut collected: C = self.0.collect().await;
506 collected.as_mut().sort();
507 collected
508 }
509
510 pub async fn assert_yields_unordered<T2: Debug>(
513 &mut self,
514 expected: impl IntoIterator<Item = T2>,
515 ) where
516 T: Debug + PartialEq<T2>,
517 {
518 let mut expected: Vec<T2> = expected.into_iter().collect();
519
520 while !expected.is_empty() {
521 if let Some(next) = self.0.next().await {
522 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
523 if let Some((i, _)) = idx {
524 expected.swap_remove(i);
525 } else {
526 panic!("Stream yielded unexpected message: {:?}", next);
527 }
528 } else {
529 panic!("Stream ended early, still expected: {:?}", expected);
530 }
531 }
532 }
533
534 pub async fn assert_yields_only_unordered<T2: Debug>(
537 mut self,
538 expected: impl IntoIterator<Item = T2>,
539 ) where
540 T: Debug + PartialEq<T2>,
541 {
542 self.assert_yields_unordered(expected).await;
543 self.assert_no_more().await;
544 }
545}
546
547impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
548 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
549{
550 type Output = SimReceiver<'a, T, O, R>;
551
552 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
553 let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
554
555 assert!(ctx.remaining_ports.remove(looked_up));
556 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
557 ctx.output_ports.insert(*looked_up, sink);
558
559 SimReceiver(
560 Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
561 PhantomData,
562 )
563 }
564}
565
566pub struct SimSender<T, O: Ordering, R: Retries>(
568 Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
569 PhantomData<(O, R)>,
570);
571impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
572 pub fn send(&self, t: T) {
575 (self.0)(t).unwrap()
576 }
577
578 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
581 for t in iter {
582 (self.0)(t).unwrap();
583 }
584 }
585}
586
587impl<T> SimSender<T, NoOrder, ExactlyOnce> {
588 pub fn send_many_unordered<I: IntoIterator<Item = T>>(
591 &self,
592 iter: I,
593 ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
594 for t in iter {
595 (self.0)(t)?;
596 }
597 Ok(())
598 }
599}
600
601impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
602 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
603{
604 type Output = SimSender<T, O, R>;
605
606 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
607 let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
608
609 assert!(ctx.remaining_ports.remove(looked_up));
610 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
611 ctx.input_ports.insert(*looked_up, source);
612 SimSender(
613 Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
614 PhantomData,
615 )
616 }
617}
618
619enum LogKind<W: std::io::Write> {
620 Null,
621 Stderr,
622 Custom(W),
623}
624
625impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
627 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
628 match self {
629 LogKind::Null => Ok(()),
630 LogKind::Stderr => {
631 eprint!("{}", s);
632 Ok(())
633 }
634 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
635 }
636 }
637}
638
639type Hooks = HashMap<(LocationId, Option<u32>), Vec<Box<dyn SimHook>>>;
640
641struct LaunchedSim<W: std::io::Write> {
644 async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
645 possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
646 not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
647 hooks: Hooks,
648 log: LogKind<W>,
649}
650
651impl<W: std::io::Write> LaunchedSim<W> {
652 async fn scheduler(&mut self) {
653 loop {
654 tokio::task::yield_now().await;
655 let mut any_made_progress = false;
656 for (loc, c_id, dfir) in &mut self.async_dfirs {
657 if dfir.run_tick().await {
658 any_made_progress = true;
659 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
660 .not_ready_ticks
661 .drain(..)
662 .partition(|(tick_loc, tick_c_id, _)| {
663 let LocationId::Tick(_, outer) = tick_loc else {
664 unreachable!()
665 };
666 outer.as_ref() == loc && tick_c_id == c_id
667 });
668
669 self.possibly_ready_ticks.extend(now_ready);
670 self.not_ready_ticks.extend(still_not_ready);
671 }
672 }
673
674 if any_made_progress {
675 continue;
676 } else {
677 use bolero::generator::*;
678
679 let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
680 .possibly_ready_ticks
681 .drain(..)
682 .partition(|(name, cid, _)| {
683 self.hooks
684 .get(&(name.clone(), *cid))
685 .unwrap()
686 .iter()
687 .any(|hook| {
688 hook.current_decision().unwrap_or(false)
689 || hook.can_make_nontrivial_decision()
690 })
691 });
692
693 self.possibly_ready_ticks = ready;
694 self.not_ready_ticks.append(&mut not_ready);
695
696 if self.possibly_ready_ticks.is_empty() {
697 break;
698 } else {
699 let next_tick = (0..self.possibly_ready_ticks.len()).any();
700 let mut removed = self.possibly_ready_ticks.remove(next_tick);
701
702 match &mut self.log {
703 LogKind::Null => {}
704 LogKind::Stderr => {
705 if let Some(cid) = &removed.1 {
706 eprintln!(
707 "\n{}",
708 format!("Running Tick (Cluster Member {})", cid)
709 .color(colored::Color::Magenta)
710 .bold()
711 )
712 } else {
713 eprintln!(
714 "\n{}",
715 "Running Tick".color(colored::Color::Magenta).bold()
716 )
717 }
718 }
719 LogKind::Custom(writer) => {
720 writeln!(
721 writer,
722 "\n{}",
723 "Running Tick".color(colored::Color::Magenta).bold()
724 )
725 .unwrap();
726 }
727 }
728
729 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
730 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
731 write.write_str(" ")
732 };
733
734 let mut tick_decision_writer =
735 indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
736 inserter: &mut asterisk_indenter,
737 });
738
739 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
740 let mut remaining_decision_count = hooks.len();
741 let mut made_nontrivial_decision = false;
742
743 bolero_generator::any::scope::borrow_with(|driver| {
744 hooks.iter_mut().for_each(|hook| {
746 if let Some(is_nontrivial) = hook.current_decision() {
747 made_nontrivial_decision |= is_nontrivial;
748 remaining_decision_count -= 1;
749 } else if !hook.can_make_nontrivial_decision() {
750 hook.autonomous_decision(driver, false);
754 remaining_decision_count -= 1;
755 }
756 });
757
758 hooks.iter_mut().for_each(|hook| {
759 if hook.current_decision().is_none() {
760 made_nontrivial_decision |= hook.autonomous_decision(
761 driver,
762 !made_nontrivial_decision && remaining_decision_count == 1,
763 );
764 remaining_decision_count -= 1;
765 }
766
767 hook.release_decision(&mut tick_decision_writer);
768 });
769 });
770
771 assert!(removed.2.run_tick().await);
772 self.possibly_ready_ticks.push(removed);
773 }
774 }
775 }
776 }
777}