Lines
98.35 %
Functions
100 %
Branches
use super::*;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::time::sleep;
/// Polls `drain` until it returns at least one line or the budget
/// expires, yielding to let the worker run between polls. The
/// real-`Session` path warms a module cache on first eval, so a single
/// `try_recv` immediately after `submit` is racy; a bounded poll keeps
/// the test deterministic without a fixed sleep that flakes on slow CI.
async fn drain_one(eval: &mut ConsoleEval) -> Vec<String> {
for _ in 0..200 {
let lines = eval.drain();
if !lines.is_empty() {
return lines;
}
sleep(Duration::from_millis(20)).await;
eval.drain()
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn real_session_evaluates_bare_form_via_envelope_wrapping() {
let mut eval = ConsoleEval::spawn(&Handle::current(), Uuid::nil()).expect("spawn session");
eval.submit("(+ 1 2)".to_string());
let lines = drain_one(&mut eval).await;
assert_eq!(lines.len(), 1, "got {lines:?}");
let line = &lines[0];
assert!(line.contains(":value 3"), "{line:?}");
assert!(line.contains(":id 0"), "{line:?}");
async fn submit_increments_the_envelope_id() {
let first = drain_one(&mut eval).await;
assert!(first[0].contains(":id 0"), "{first:?}");
eval.submit("(+ 2 2)".to_string());
let second = drain_one(&mut eval).await;
assert!(second[0].contains(":id 1"), "{second:?}");
assert!(second[0].contains(":value 4"), "{second:?}");
async fn echo_actor_round_trips_the_submitted_form() {
let mut eval = ConsoleEval::echo(&Handle::current());
eval.submit("(foo)".to_string());
assert_eq!(lines[0], "(:id 0 :form (foo))");
async fn interrupt_cancels_an_inflight_eval() {
// Fuel is effectively unbounded and the loop runs billions of
// iterations, so the form can never terminate on its own within the
// test window. `interrupt()` must both advance the interrupt
// generation and bump the epoch, so `:code interrupted` is the only
// possible outcome — a real regression guard for the cancel path.
let ctx = ScriptCtx::new(Uuid::nil()).with_limits(ScriptLimits {
fuel: u64::MAX,
..ScriptLimits::default()
});
let mut eval = ConsoleEval::spawn_with_ctx(&Handle::current(), ctx).expect("spawn session");
eval.submit("(do ((i 0 (+ i 1))) ((>= i 2000000000) i))".to_string());
sleep(Duration::from_millis(40)).await;
eval.interrupt();
assert!(lines[0].contains(":code interrupted"), "{lines:?}");
async fn interrupt_without_inflight_eval_does_not_panic() {
let eval = ConsoleEval::echo(&Handle::current());
async fn submit_after_worker_stops_reports_failure() {
let handle = eval.worker_handle();
handle.abort();
if handle.is_finished() {
break;
sleep(Duration::from_millis(10)).await;
assert!(
!eval.submit("(x)".to_string()),
"submit must report failure once the worker has stopped"
);
async fn drain_surfaces_worker_stopped_notice_exactly_once() {
let first = eval.drain();
assert_eq!(
first,
vec![WORKER_STOPPED_NOTICE.to_string()],
"drain must surface the notice once the worker is gone"
eval.drain().is_empty(),
"the notice must not repeat on later drains"
async fn worker_breaks_when_receiver_is_dropped_mid_eval() {
let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
let worker = Handle::current().spawn(async move {
while let Some(frame) = forms_rx.recv().await {
if results_tx.send(frame).is_err() {
forms_tx.send("(a)".to_string()).expect("queue first frame");
forms_tx
.send("(b)".to_string())
.expect("queue second frame");
drop(results_rx);
if worker.is_finished() {
return;
worker.is_finished(),
"worker must break out of its loop once the receiver is gone"
async fn dropping_the_eval_ends_the_worker() {
drop(eval);
assert!(handle.is_finished(), "worker did not exit after drop");