tui/tabs/nms_eval.rs
1//! Async eval bridge for the Console tab.
2//!
3//! `ConsoleEval` is a concrete (no `dyn`) actor: it owns an mpsc to a
4//! per-console worker that drives `rpc::Session::handle_form`, plus the
5//! cancel handles (`EpochBumper` + `InterruptHandle`). The UI side is
6//! sync — `submit` is a non-blocking send, `drain` a `try_recv` loop —
7//! so the 200 ms `run_loop` never awaits eval.
8//!
9//! `Session::handle_form` parses an RPC request **envelope**
10//! (`(:id N :form <form>)`), not a bare form, so `submit` wraps each
11//! input with a monotonic id before handoff — mirroring `nms`'s
12//! `RpcEval::eval` and the sshd `eval_channel` worker. The worker
13//! receives already-wrapped envelopes and only forwards them.
14//!
15//! Two constructors form the test seam: [`ConsoleEval::spawn`] builds a
16//! real DB-backed `Session` actor, while [`ConsoleEval::echo`] spawns a
17//! runtime-only worker that echoes frames back (no `Session`, no DB)
18//! for plumbing tests.
19
20use rpc::session::SessionError;
21use rpc::{EpochBumper, InterruptHandle, ScriptCtx, Session};
22use sqlx::types::Uuid;
23use tokio::runtime::Handle;
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::error::TryRecvError;
26use tokio::task::JoinHandle;
27
28/// Scrollback line emitted once when the eval worker's result channel
29/// closes, so a submission against a dead worker does not look hung.
30const WORKER_STOPPED_NOTICE: &str = "eval worker stopped";
31
32/// Async bridge between the sync TUI and an `rpc::Session` eval worker.
33pub struct ConsoleEval {
34 forms_tx: mpsc::UnboundedSender<String>,
35 results_rx: mpsc::UnboundedReceiver<String>,
36 /// Epoch cancel handle for the worker's `Session`. `None` for the
37 /// `echo` test actor, which has no engine to trip.
38 bumper: Option<EpochBumper>,
39 interrupt: InterruptHandle,
40 /// Monotonic envelope id; the wire `:id` of the next submission.
41 next_id: u64,
42 /// Set once `drain` observes the result channel disconnected, so the
43 /// "worker stopped" notice reaches scrollback exactly once rather than
44 /// on every subsequent tick.
45 worker_stopped_reported: bool,
46 /// Held so the worker stays alive for the eval's lifetime. Dropping
47 /// `ConsoleEval` closes `forms_tx`; the worker's `recv` returns
48 /// `None` and the task exits. Read only by the test seam.
49 _worker: JoinHandle<()>,
50}
51
52impl ConsoleEval {
53 /// Wraps `form` in a fresh `(:id {next_id} :form {form})` envelope
54 /// and hands it to the worker. Non-blocking. Returns `false` when the
55 /// channel is closed (the worker has stopped) so the caller can
56 /// surface a notice instead of leaving the console silently hung.
57 pub fn submit(&mut self, form: String) -> bool {
58 let envelope = format!("(:id {} :form {})", self.next_id, form);
59 self.next_id = self.next_id.wrapping_add(1);
60 self.forms_tx.send(envelope).is_ok()
61 }
62
63 /// Drains every ready response without blocking. If the result
64 /// channel has disconnected (the worker exited, possibly mid-eval),
65 /// surfaces a single [`WORKER_STOPPED_NOTICE`] line into the drained
66 /// results so a silently hung submission becomes visible; the notice
67 /// is emitted at most once.
68 pub fn drain(&mut self) -> Vec<String> {
69 let mut out = Vec::new();
70 loop {
71 match self.results_rx.try_recv() {
72 Ok(response) => out.push(response),
73 Err(TryRecvError::Empty) => break,
74 Err(TryRecvError::Disconnected) => {
75 if !self.worker_stopped_reported {
76 self.worker_stopped_reported = true;
77 out.push(WORKER_STOPPED_NOTICE.to_string());
78 }
79 break;
80 }
81 }
82 }
83 out
84 }
85
86 /// Requests a cooperative cancel of the in-flight eval. Advances the
87 /// interrupt generation first (caught by `handle_form`'s pre-start
88 /// `check_interrupt` if the eval has not entered Wasm yet) then bumps
89 /// the engine epoch (traps an eval already running) — the
90 /// eval_channel ordering that closes the worker-pickup window.
91 pub fn interrupt(&self) {
92 self.interrupt.interrupt();
93 if let Some(bumper) = &self.bumper {
94 bumper.bump();
95 }
96 }
97
98 /// Builds a real `Session` for `user_id` and spawns its eval worker.
99 pub fn spawn(handle: &Handle, user_id: Uuid) -> Result<Self, SessionError> {
100 Self::from_ctx(handle, ScriptCtx::new(user_id))
101 }
102
103 /// Builds the worker from a caller-supplied context.
104 fn from_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
105 let mut session = Session::new(ctx)?;
106 let bumper = session.epoch_bumper();
107 let interrupt = session.interrupt_handle();
108 let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
109 let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
110 let worker = handle.spawn(async move {
111 while let Some(frame) = forms_rx.recv().await {
112 let response = session.handle_form(&frame).await;
113 if results_tx.send(response).is_err() {
114 break;
115 }
116 }
117 });
118 Ok(Self {
119 forms_tx,
120 results_rx,
121 bumper: Some(bumper),
122 interrupt,
123 next_id: 0,
124 worker_stopped_reported: false,
125 _worker: worker,
126 })
127 }
128
129 /// Spawns a runtime-only worker that echoes each received frame back
130 /// verbatim. No `Session`, no DB; used to exercise the channel/drain
131 /// plumbing in isolation.
132 pub fn echo(handle: &Handle) -> Self {
133 let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
134 let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
135 let worker = handle.spawn(async move {
136 while let Some(frame) = forms_rx.recv().await {
137 if results_tx.send(frame).is_err() {
138 break;
139 }
140 }
141 });
142 Self {
143 forms_tx,
144 results_rx,
145 bumper: None,
146 interrupt: InterruptHandle::default(),
147 next_id: 0,
148 worker_stopped_reported: false,
149 _worker: worker,
150 }
151 }
152}
153
154#[cfg(test)]
155impl ConsoleEval {
156 /// Build the worker from a caller-supplied context — the test seam
157 /// that injects custom `ScriptLimits` (e.g. unbounded fuel for the
158 /// interrupt test). Not part of the production surface.
159 pub(crate) fn spawn_with_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
160 Self::from_ctx(handle, ctx)
161 }
162
163 /// An owned abort handle for the worker task, so a test can observe
164 /// the worker finishing after the eval itself is dropped or abort it
165 /// to simulate a stopped worker.
166 pub(crate) fn worker_handle(&self) -> tokio::task::AbortHandle {
167 self._worker.abort_handle()
168 }
169}
170
171#[cfg(test)]
172use rpc::ScriptLimits;
173
174#[cfg(test)]
175mod tests;