Lines
95.95 %
Functions
38.46 %
Branches
100 %
//! Async eval bridge for the Console tab.
//!
//! `ConsoleEval` is a concrete (no `dyn`) actor: it owns an mpsc to a
//! per-console worker that drives `rpc::Session::handle_form`, plus the
//! cancel handles (`EpochBumper` + `InterruptHandle`). The UI side is
//! sync — `submit` is a non-blocking send, `drain` a `try_recv` loop —
//! so the 200 ms `run_loop` never awaits eval.
//! `Session::handle_form` parses an RPC request **envelope**
//! (`(:id N :form <form>)`), not a bare form, so `submit` wraps each
//! input with a monotonic id before handoff — mirroring `nms`'s
//! `RpcEval::eval` and the sshd `eval_channel` worker. The worker
//! receives already-wrapped envelopes and only forwards them.
//! Two constructors form the test seam: [`ConsoleEval::spawn`] builds a
//! real DB-backed `Session` actor, while [`ConsoleEval::echo`] spawns a
//! runtime-only worker that echoes frames back (no `Session`, no DB)
//! for plumbing tests.
use rpc::session::SessionError;
use rpc::{EpochBumper, InterruptHandle, ScriptCtx, Session};
use sqlx::types::Uuid;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::task::JoinHandle;
/// Scrollback line emitted once when the eval worker's result channel
/// closes, so a submission against a dead worker does not look hung.
const WORKER_STOPPED_NOTICE: &str = "eval worker stopped";
/// Async bridge between the sync TUI and an `rpc::Session` eval worker.
pub struct ConsoleEval {
forms_tx: mpsc::UnboundedSender<String>,
results_rx: mpsc::UnboundedReceiver<String>,
/// Epoch cancel handle for the worker's `Session`. `None` for the
/// `echo` test actor, which has no engine to trip.
bumper: Option<EpochBumper>,
interrupt: InterruptHandle,
/// Monotonic envelope id; the wire `:id` of the next submission.
next_id: u64,
/// Set once `drain` observes the result channel disconnected, so the
/// "worker stopped" notice reaches scrollback exactly once rather than
/// on every subsequent tick.
worker_stopped_reported: bool,
/// Held so the worker stays alive for the eval's lifetime. Dropping
/// `ConsoleEval` closes `forms_tx`; the worker's `recv` returns
/// `None` and the task exits. Read only by the test seam.
_worker: JoinHandle<()>,
}
impl ConsoleEval {
/// Wraps `form` in a fresh `(:id {next_id} :form {form})` envelope
/// and hands it to the worker. Non-blocking. Returns `false` when the
/// channel is closed (the worker has stopped) so the caller can
/// surface a notice instead of leaving the console silently hung.
pub fn submit(&mut self, form: String) -> bool {
let envelope = format!("(:id {} :form {})", self.next_id, form);
self.next_id = self.next_id.wrapping_add(1);
self.forms_tx.send(envelope).is_ok()
/// Drains every ready response without blocking. If the result
/// channel has disconnected (the worker exited, possibly mid-eval),
/// surfaces a single [`WORKER_STOPPED_NOTICE`] line into the drained
/// results so a silently hung submission becomes visible; the notice
/// is emitted at most once.
pub fn drain(&mut self) -> Vec<String> {
let mut out = Vec::new();
loop {
match self.results_rx.try_recv() {
Ok(response) => out.push(response),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
if !self.worker_stopped_reported {
self.worker_stopped_reported = true;
out.push(WORKER_STOPPED_NOTICE.to_string());
break;
out
/// Requests a cooperative cancel of the in-flight eval. Advances the
/// interrupt generation first (caught by `handle_form`'s pre-start
/// `check_interrupt` if the eval has not entered Wasm yet) then bumps
/// the engine epoch (traps an eval already running) — the
/// eval_channel ordering that closes the worker-pickup window.
pub fn interrupt(&self) {
self.interrupt.interrupt();
if let Some(bumper) = &self.bumper {
bumper.bump();
/// Builds a real `Session` for `user_id` and spawns its eval worker.
pub fn spawn(handle: &Handle, user_id: Uuid) -> Result<Self, SessionError> {
Self::from_ctx(handle, ScriptCtx::new(user_id))
/// Builds the worker from a caller-supplied context.
fn from_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
let mut session = Session::new(ctx)?;
let bumper = session.epoch_bumper();
let interrupt = session.interrupt_handle();
let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
let (results_tx, results_rx) = mpsc::unbounded_channel::<String>();
let worker = handle.spawn(async move {
while let Some(frame) = forms_rx.recv().await {
let response = session.handle_form(&frame).await;
if results_tx.send(response).is_err() {
});
Ok(Self {
forms_tx,
results_rx,
bumper: Some(bumper),
interrupt,
next_id: 0,
worker_stopped_reported: false,
_worker: worker,
})
/// Spawns a runtime-only worker that echoes each received frame back
/// verbatim. No `Session`, no DB; used to exercise the channel/drain
/// plumbing in isolation.
pub fn echo(handle: &Handle) -> Self {
if results_tx.send(frame).is_err() {
Self {
bumper: None,
interrupt: InterruptHandle::default(),
#[cfg(test)]
/// Build the worker from a caller-supplied context — the test seam
/// that injects custom `ScriptLimits` (e.g. unbounded fuel for the
/// interrupt test). Not part of the production surface.
pub(crate) fn spawn_with_ctx(handle: &Handle, ctx: ScriptCtx) -> Result<Self, SessionError> {
Self::from_ctx(handle, ctx)
/// An owned abort handle for the worker task, so a test can observe
/// the worker finishing after the eval itself is dropped or abort it
/// to simulate a stopped worker.
pub(crate) fn worker_handle(&self) -> tokio::task::AbortHandle {
self._worker.abort_handle()
use rpc::ScriptLimits;
mod tests;