Skip to main content

tui/tabs/
nms_eval.rs

1//! Async eval bridge for the Console tab.
2//!
3//! `ConsoleEval` is a concrete (no `dyn`) actor: it owns an mpsc to a
4//! per-console worker that drives `rpc::Session::handle_form`, plus the
5//! cancel handles (`EpochBumper` + `InterruptHandle`). The UI side is
6//! sync — `submit` is a non-blocking send, `drain` a `try_recv` loop —
7//! so the 200 ms `run_loop` never awaits eval.
8//!
9//! `Session::handle_form` parses an RPC request **envelope**
10//! (`(:id N :form <form>)`), not a bare form, so `submit` wraps each
11//! input with a monotonic id before handoff — mirroring `nms`'s
12//! `RpcEval::eval` and the sshd `eval_channel` worker. The worker
13//! receives already-wrapped envelopes and only forwards them.
14//!
15//! Two constructors form the test seam: [`ConsoleEval::spawn`] builds a
16//! real DB-backed `Session` actor, while [`ConsoleEval::echo`] spawns a
17//! runtime-only worker that echoes frames back (no `Session`, no DB)
18//! for plumbing tests.
19
20use rpc::session::SessionError;
21use rpc::{EpochBumper, InterruptHandle, ScriptCtx, Session};
22use sqlx::types::Uuid;
23use tokio::runtime::Handle;
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::error::TryRecvError;
26use tokio::task::JoinHandle;
27
28/// Scrollback line emitted once when the eval worker's result channel
29/// closes, so a submission against a dead worker does not look hung.
30const WORKER_STOPPED_NOTICE: &str = "eval worker stopped";
31
32/// Async bridge between the sync TUI and an `rpc::Session` eval worker.
33pub struct ConsoleEval {
34    forms_tx: mpsc::UnboundedSender<String>,
35    results_rx: mpsc::UnboundedReceiver<String>,
36    /// Epoch cancel handle for the worker's `Session`. `None` for the
37    /// `echo` test actor, which has no engine to trip.
38    bumper: Option<EpochBumper>,
39    interrupt: InterruptHandle,
40    /// Monotonic envelope id; the wire `:id` of the next submission.
41    next_id: u64,
42    /// Set once `drain` observes the result channel disconnected, so the
43    /// "worker stopped" notice reaches scrollback exactly once rather than
44    /// on every subsequent tick.
45    worker_stopped_reported: bool,
46    /// Held so the worker stays alive for the eval's lifetime. Dropping
47    /// `ConsoleEval` closes `forms_tx`; the worker's `recv` returns
48    /// `None` and the task exits. Read only by the test seam.
49    _worker: JoinHandle<()>,
50}
51
52impl ConsoleEval {
53    /// Wraps `form` in a fresh `(:id {next_id} :form {form})` envelope
54    /// and hands it to the worker. Non-blocking. Returns `false` when the
55    /// channel is closed (the worker has stopped) so the caller can
56    /// surface a notice instead of leaving the console silently hung.
57    pub fn submit(&mut self, form: String) -> bool {
58        let envelope = format!("(:id {} :form {})", self.next_id, form);
59        self.next_id = self.next_id.wrapping_add(1);
60        self.forms_tx.send(envelope).is_ok()
61    }
62
63    /// Drains every ready response without blocking. If the result
64    /// channel has disconnected (the worker exited, possibly mid-eval),
65    /// surfaces a single [`WORKER_STOPPED_NOTICE`] line into the drained
66    /// results so a silently hung submission becomes visible; the notice
67    /// is emitted at most once.
68    pub fn drain(&mut self) -> Vec<String> {
69        let mut out = Vec::new();
70        loop {
71            match self.results_rx.try_recv() {
72                Ok(response) => out.push(response),
73                Err(TryRecvError::Empty) => break,
74                Err(TryRecvError::Disconnected) => {
75                    if !self.worker_stopped_reported {
76                        self.worker_stopped_reported = true;
77                        out.push(WORKER_STOPPED_NOTICE.to_string());
78                    }
79                    break;
80                }
81            }
82        }
83        out
84    }
85
86    /// Requests a cooperative cancel of the in-flight eval. Advances the
87    /// interrupt generation first (caught by `handle_form`'s pre-start
88    /// `check_interrupt` if the eval has not entered Wasm yet) then bumps
89    /// the engine epoch (traps an eval already running) — the
90    /// eval_channel ordering that closes the worker-pickup window.
91    pub fn interrupt(&self) {
92        self.interrupt.interrupt();
93        if let Some(bumper) = &self.bumper {
94            bumper.bump();
95        }
96    }
97
98    /// Builds a real `Session` for `user_id` and spawns its eval worker.
99    pub fn spawn(handle: &Handle, user_id: Uuid) -> Result<Self, SessionError> {
100        Self::from_ctx(handle, ScriptCtx::new(user_id))
101    }
102
103    /// Builds the worker from a caller-supplied context.
104    fn from_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
105        let mut session = Session::new(ctx)?;
106        let bumper = session.epoch_bumper();
107        let interrupt = session.interrupt_handle();
108        let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
109        let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
110        let worker = handle.spawn(async move {
111            while let Some(frame) = forms_rx.recv().await {
112                let response = session.handle_form(&frame).await;
113                if results_tx.send(response).is_err() {
114                    break;
115                }
116            }
117        });
118        Ok(Self {
119            forms_tx,
120            results_rx,
121            bumper: Some(bumper),
122            interrupt,
123            next_id: 0,
124            worker_stopped_reported: false,
125            _worker: worker,
126        })
127    }
128
129    /// Spawns a runtime-only worker that echoes each received frame back
130    /// verbatim. No `Session`, no DB; used to exercise the channel/drain
131    /// plumbing in isolation.
132    pub fn echo(handle: &Handle) -> Self {
133        let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
134        let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
135        let worker = handle.spawn(async move {
136            while let Some(frame) = forms_rx.recv().await {
137                if results_tx.send(frame).is_err() {
138                    break;
139                }
140            }
141        });
142        Self {
143            forms_tx,
144            results_rx,
145            bumper: None,
146            interrupt: InterruptHandle::default(),
147            next_id: 0,
148            worker_stopped_reported: false,
149            _worker: worker,
150        }
151    }
152}
153
154#[cfg(test)]
155impl ConsoleEval {
156    /// Build the worker from a caller-supplied context — the test seam
157    /// that injects custom `ScriptLimits` (e.g. unbounded fuel for the
158    /// interrupt test). Not part of the production surface.
159    pub(crate) fn spawn_with_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
160        Self::from_ctx(handle, ctx)
161    }
162
163    /// An owned abort handle for the worker task, so a test can observe
164    /// the worker finishing after the eval itself is dropped or abort it
165    /// to simulate a stopped worker.
166    pub(crate) fn worker_handle(&self) -> tokio::task::AbortHandle {
167        self._worker.abort_handle()
168    }
169}
170
171#[cfg(test)]
172use rpc::ScriptLimits;
173
174#[cfg(test)]
175mod tests;