1
//! Async eval bridge for the Console tab.
2
//!
3
//! `ConsoleEval` is a concrete (no `dyn`) actor: it owns an mpsc to a
4
//! per-console worker that drives `rpc::Session::handle_form`, plus the
5
//! cancel handles (`EpochBumper` + `InterruptHandle`). The UI side is
6
//! sync — `submit` is a non-blocking send, `drain` a `try_recv` loop —
7
//! so the 200 ms `run_loop` never awaits eval.
8
//!
9
//! `Session::handle_form` parses an RPC request **envelope**
10
//! (`(:id N :form <form>)`), not a bare form, so `submit` wraps each
11
//! input with a monotonic id before handoff — mirroring `nms`'s
12
//! `RpcEval::eval` and the sshd `eval_channel` worker. The worker
13
//! receives already-wrapped envelopes and only forwards them.
14
//!
15
//! Two constructors form the test seam: [`ConsoleEval::spawn`] builds a
16
//! real DB-backed `Session` actor, while [`ConsoleEval::echo`] spawns a
17
//! runtime-only worker that echoes frames back (no `Session`, no DB)
18
//! for plumbing tests.
19

            
20
use rpc::session::SessionError;
21
use rpc::{EpochBumper, InterruptHandle, ScriptCtx, Session};
22
use sqlx::types::Uuid;
23
use tokio::runtime::Handle;
24
use tokio::sync::mpsc;
25
use tokio::sync::mpsc::error::TryRecvError;
26
use tokio::task::JoinHandle;
27

            
28
/// Scrollback line emitted once when the eval worker's result channel
29
/// closes, so a submission against a dead worker does not look hung.
30
const WORKER_STOPPED_NOTICE: &str = "eval worker stopped";
31

            
32
/// Async bridge between the sync TUI and an `rpc::Session` eval worker.
33
pub struct ConsoleEval {
34
    forms_tx: mpsc::UnboundedSender<String>,
35
    results_rx: mpsc::UnboundedReceiver<String>,
36
    /// Epoch cancel handle for the worker's `Session`. `None` for the
37
    /// `echo` test actor, which has no engine to trip.
38
    bumper: Option<EpochBumper>,
39
    interrupt: InterruptHandle,
40
    /// Monotonic envelope id; the wire `:id` of the next submission.
41
    next_id: u64,
42
    /// Set once `drain` observes the result channel disconnected, so the
43
    /// "worker stopped" notice reaches scrollback exactly once rather than
44
    /// on every subsequent tick.
45
    worker_stopped_reported: bool,
46
    /// Held so the worker stays alive for the eval's lifetime. Dropping
47
    /// `ConsoleEval` closes `forms_tx`; the worker's `recv` returns
48
    /// `None` and the task exits. Read only by the test seam.
49
    _worker: JoinHandle<()>,
50
}
51

            
52
impl ConsoleEval {
53
    /// Wraps `form` in a fresh `(:id {next_id} :form {form})` envelope
54
    /// and hands it to the worker. Non-blocking. Returns `false` when the
55
    /// channel is closed (the worker has stopped) so the caller can
56
    /// surface a notice instead of leaving the console silently hung.
57
10
    pub fn submit(&mut self, form: String) -> bool {
58
10
        let envelope = format!("(:id {} :form {})", self.next_id, form);
59
10
        self.next_id = self.next_id.wrapping_add(1);
60
10
        self.forms_tx.send(envelope).is_ok()
61
10
    }
62

            
63
    /// Drains every ready response without blocking. If the result
64
    /// channel has disconnected (the worker exited, possibly mid-eval),
65
    /// surfaces a single [`WORKER_STOPPED_NOTICE`] line into the drained
66
    /// results so a silently hung submission becomes visible; the notice
67
    /// is emitted at most once.
68
106
    pub fn drain(&mut self) -> Vec<String> {
69
106
        let mut out = Vec::new();
70
        loop {
71
114
            match self.results_rx.try_recv() {
72
8
                Ok(response) => out.push(response),
73
104
                Err(TryRecvError::Empty) => break,
74
                Err(TryRecvError::Disconnected) => {
75
2
                    if !self.worker_stopped_reported {
76
1
                        self.worker_stopped_reported = true;
77
1
                        out.push(WORKER_STOPPED_NOTICE.to_string());
78
1
                    }
79
2
                    break;
80
                }
81
            }
82
        }
83
106
        out
84
106
    }
85

            
86
    /// Requests a cooperative cancel of the in-flight eval. Advances the
87
    /// interrupt generation first (caught by `handle_form`'s pre-start
88
    /// `check_interrupt` if the eval has not entered Wasm yet) then bumps
89
    /// the engine epoch (traps an eval already running) — the
90
    /// eval_channel ordering that closes the worker-pickup window.
91
3
    pub fn interrupt(&self) {
92
3
        self.interrupt.interrupt();
93
3
        if let Some(bumper) = &self.bumper {
94
2
            bumper.bump();
95
2
        }
96
3
    }
97

            
98
    /// Builds a real `Session` for `user_id` and spawns its eval worker.
99
2
    pub fn spawn(handle: &Handle, user_id: Uuid) -> Result<Self, SessionError> {
100
2
        Self::from_ctx(handle, ScriptCtx::new(user_id))
101
2
    }
102

            
103
    /// Builds the worker from a caller-supplied context.
104
4
    fn from_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
105
4
        let mut session = Session::new(ctx)?;
106
4
        let bumper = session.epoch_bumper();
107
4
        let interrupt = session.interrupt_handle();
108
4
        let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
109
4
        let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
110
4
        let worker = handle.spawn(async move {
111
9
            while let Some(frame) = forms_rx.recv().await {
112
5
                let response = session.handle_form(&frame).await;
113
5
                if results_tx.send(response).is_err() {
114
                    break;
115
5
                }
116
            }
117
        });
118
4
        Ok(Self {
119
4
            forms_tx,
120
4
            results_rx,
121
4
            bumper: Some(bumper),
122
4
            interrupt,
123
4
            next_id: 0,
124
4
            worker_stopped_reported: false,
125
4
            _worker: worker,
126
4
        })
127
4
    }
128

            
129
    /// Spawns a runtime-only worker that echoes each received frame back
130
    /// verbatim. No `Session`, no DB; used to exercise the channel/drain
131
    /// plumbing in isolation.
132
8
    pub fn echo(handle: &Handle) -> Self {
133
8
        let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
134
8
        let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
135
8
        let worker = handle.spawn(async move {
136
8
            while let Some(frame) = forms_rx.recv().await {
137
3
                if results_tx.send(frame).is_err() {
138
                    break;
139
3
                }
140
            }
141
3
        });
142
8
        Self {
143
8
            forms_tx,
144
8
            results_rx,
145
8
            bumper: None,
146
8
            interrupt: InterruptHandle::default(),
147
8
            next_id: 0,
148
8
            worker_stopped_reported: false,
149
8
            _worker: worker,
150
8
        }
151
8
    }
152
}
153

            
154
#[cfg(test)]
155
impl ConsoleEval {
156
    /// Build the worker from a caller-supplied context — the test seam
157
    /// that injects custom `ScriptLimits` (e.g. unbounded fuel for the
158
    /// interrupt test). Not part of the production surface.
159
2
    pub(crate) fn spawn_with_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
160
2
        Self::from_ctx(handle, ctx)
161
2
    }
162

            
163
    /// An owned abort handle for the worker task, so a test can observe
164
    /// the worker finishing after the eval itself is dropped or abort it
165
    /// to simulate a stopped worker.
166
4
    pub(crate) fn worker_handle(&self) -> tokio::task::AbortHandle {
167
4
        self._worker.abort_handle()
168
4
    }
169
}
170

            
171
#[cfg(test)]
172
use rpc::ScriptLimits;
173

            
174
#[cfg(test)]
175
mod tests;