1
use super::*;
2

            
3
use std::time::Duration;
4
use tokio::runtime::Handle;
5
use tokio::time::sleep;
6

            
7
/// Polls `drain` until it returns at least one line or the budget
8
/// expires, yielding to let the worker run between polls. The
9
/// real-`Session` path warms a module cache on first eval, so a single
10
/// `try_recv` immediately after `submit` is racy; a bounded poll keeps
11
/// the test deterministic without a fixed sleep that flakes on slow CI.
12
5
async fn drain_one(eval: &mut ConsoleEval) -> Vec<String> {
13
5
    for _ in 0..200 {
14
82
        let lines = eval.drain();
15
82
        if !lines.is_empty() {
16
5
            return lines;
17
77
        }
18
77
        sleep(Duration::from_millis(20)).await;
19
    }
20
    eval.drain()
21
5
}
22

            
23
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
24
1
async fn real_session_evaluates_bare_form_via_envelope_wrapping() {
25
1
    let mut eval = ConsoleEval::spawn(&Handle::current(), Uuid::nil()).expect("spawn session");
26
1
    eval.submit("(+ 1 2)".to_string());
27
1
    let lines = drain_one(&mut eval).await;
28
1
    assert_eq!(lines.len(), 1, "got {lines:?}");
29
1
    let line = &lines[0];
30
1
    assert!(line.contains(":value 3"), "{line:?}");
31
1
    assert!(line.contains(":id 0"), "{line:?}");
32
1
}
33

            
34
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
35
1
async fn submit_increments_the_envelope_id() {
36
1
    let mut eval = ConsoleEval::spawn(&Handle::current(), Uuid::nil()).expect("spawn session");
37
1
    eval.submit("(+ 1 2)".to_string());
38
1
    let first = drain_one(&mut eval).await;
39
1
    assert!(first[0].contains(":id 0"), "{first:?}");
40
1
    eval.submit("(+ 2 2)".to_string());
41
1
    let second = drain_one(&mut eval).await;
42
1
    assert!(second[0].contains(":id 1"), "{second:?}");
43
1
    assert!(second[0].contains(":value 4"), "{second:?}");
44
1
}
45

            
46
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
47
1
async fn echo_actor_round_trips_the_submitted_form() {
48
1
    let mut eval = ConsoleEval::echo(&Handle::current());
49
1
    eval.submit("(foo)".to_string());
50
1
    let lines = drain_one(&mut eval).await;
51
1
    assert_eq!(lines.len(), 1, "got {lines:?}");
52
1
    assert_eq!(lines[0], "(:id 0 :form (foo))");
53
1
}
54

            
55
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
56
1
async fn interrupt_cancels_an_inflight_eval() {
57
    // Fuel is effectively unbounded and the loop runs billions of
58
    // iterations, so the form can never terminate on its own within the
59
    // test window. `interrupt()` must both advance the interrupt
60
    // generation and bump the epoch, so `:code interrupted` is the only
61
    // possible outcome — a real regression guard for the cancel path.
62
1
    let ctx = ScriptCtx::new(Uuid::nil()).with_limits(ScriptLimits {
63
1
        fuel: u64::MAX,
64
1
        ..ScriptLimits::default()
65
1
    });
66
1
    let mut eval = ConsoleEval::spawn_with_ctx(&Handle::current(), ctx).expect("spawn session");
67
1
    eval.submit("(do ((i 0 (+ i 1))) ((>= i 2000000000) i))".to_string());
68
1
    sleep(Duration::from_millis(40)).await;
69
1
    eval.interrupt();
70
1
    let lines = drain_one(&mut eval).await;
71
1
    assert_eq!(lines.len(), 1, "got {lines:?}");
72
1
    assert!(lines[0].contains(":code interrupted"), "{lines:?}");
73
1
}
74

            
75
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
76
1
async fn interrupt_without_inflight_eval_does_not_panic() {
77
1
    let eval = ConsoleEval::echo(&Handle::current());
78
1
    eval.interrupt();
79
1
}
80

            
81
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
82
1
async fn submit_after_worker_stops_reports_failure() {
83
1
    let mut eval = ConsoleEval::echo(&Handle::current());
84
1
    let handle = eval.worker_handle();
85
1
    handle.abort();
86
1
    for _ in 0..200 {
87
2
        if handle.is_finished() {
88
1
            break;
89
1
        }
90
1
        sleep(Duration::from_millis(10)).await;
91
1
    }
92
1
    assert!(
93
1
        !eval.submit("(x)".to_string()),
94
1
        "submit must report failure once the worker has stopped"
95
1
    );
96
1
}
97

            
98
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
99
1
async fn drain_surfaces_worker_stopped_notice_exactly_once() {
100
1
    let mut eval = ConsoleEval::echo(&Handle::current());
101
1
    let handle = eval.worker_handle();
102
1
    handle.abort();
103
1
    for _ in 0..200 {
104
2
        if handle.is_finished() {
105
1
            break;
106
1
        }
107
1
        sleep(Duration::from_millis(10)).await;
108
    }
109
1
    let first = eval.drain();
110
1
    assert_eq!(
111
        first,
112
1
        vec![WORKER_STOPPED_NOTICE.to_string()],
113
        "drain must surface the notice once the worker is gone"
114
    );
115
1
    assert!(
116
1
        eval.drain().is_empty(),
117
1
        "the notice must not repeat on later drains"
118
1
    );
119
1
}
120

            
121
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
122
1
async fn worker_breaks_when_receiver_is_dropped_mid_eval() {
123
1
    let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
124
1
    let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
125
1
    let worker = Handle::current().spawn(async move {
126
1
        while let Some(frame) = forms_rx.recv().await {
127
1
            if results_tx.send(frame).is_err() {
128
1
                break;
129
            }
130
        }
131
1
    });
132
1
    forms_tx.send("(a)".to_string()).expect("queue first frame");
133
1
    forms_tx
134
1
        .send("(b)".to_string())
135
1
        .expect("queue second frame");
136
1
    drop(results_rx);
137
1
    for _ in 0..200 {
138
2
        if worker.is_finished() {
139
1
            return;
140
1
        }
141
1
        sleep(Duration::from_millis(10)).await;
142
1
    }
143
1
    assert!(
144
1
        worker.is_finished(),
145
1
        "worker must break out of its loop once the receiver is gone"
146
1
    );
147
1
}
148

            
149
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
150
1
async fn dropping_the_eval_ends_the_worker() {
151
1
    let eval = ConsoleEval::echo(&Handle::current());
152
1
    let handle = eval.worker_handle();
153
1
    drop(eval);
154
1
    for _ in 0..200 {
155
2
        if handle.is_finished() {
156
1
            return;
157
1
        }
158
1
        sleep(Duration::from_millis(10)).await;
159
1
    }
160
1
    assert!(handle.is_finished(), "worker did not exit after drop");
161
1
}