Lines
93.98 %
Functions
74.19 %
Branches
100 %
//! Per-channel state for the `nomisync-eval` ssh subsystem.
//!
//! Owns the frame decoder (newline + balanced-paren), the epoch cancel
//! handle, and a tokio mpsc to a per-channel worker that drives
//! `rpc::Session::handle_form` independently of the russh handler
//! callback. Decoupling the worker from `data()` is what lets the
//! handler scan subsequent bytes (an emacs `C-g` arrives as the ETX
//! byte `0x03`) and bump the engine epoch synchronously, tripping the
//! in-flight eval and surfacing `(:error (:code interrupted ...))`
//! on the wire.
//! Constructed in `subsystem_request` once the requested subsystem
//! name matches `"nomisync-eval"` AND the session has authenticated
//! (i.e. `user_id.is_some()`). Dropped in `channel_eof` /
//! `channel_close` along with the rest of the per-channel state; the
//! drop closes the mpsc, the worker observes the close and exits.
use russh::ChannelId;
use russh::server::Handle as RusshHandle;
use tokio::sync::mpsc;
use rpc::session::SessionError;
use rpc::{EpochBumper, FrameDecoder, FrameError, InterruptHandle, ScriptCtx, Session};
/// ASCII End-of-Text (`Ctrl-C` / `C-g`). Clients emit this byte on
/// the same channel as form data to request a cooperative cancel of
/// the in-flight `nomi-eval`. Stripped from the byte stream before
/// the decoder sees it so it never becomes part of a frame.
const ETX: u8 = 0x03;
/// Trait for the worker's response back-channel. The russh-backed
/// impl writes to an SSH channel via the server's `Handle`; the
/// mpsc-backed impl drains into a tokio receiver for unit tests.
/// Either way, the worker emits one full response (newline-
/// terminated) per send call.
#[async_trait::async_trait]
pub trait ResponseSink: Send + Sync + 'static {
async fn send(&self, bytes: Vec<u8>);
}
pub struct RusshSink {
handle: RusshHandle,
channel: ChannelId,
impl RusshSink {
pub fn new(handle: RusshHandle, channel: ChannelId) -> Self {
Self { handle, channel }
impl ResponseSink for RusshSink {
async fn send(&self, bytes: Vec<u8>) {
if let Err(remaining) = self.handle.data(self.channel, bytes).await {
log::warn!(
"eval channel response write dropped {} bytes (channel closed?)",
remaining.len()
);
pub struct EvalChannelState {
decoder: FrameDecoder,
bumper: EpochBumper,
/// Generation-counter cancel signal, shared with the worker's
/// `Session`. Bumped together with the epoch on ETX so an
/// interrupt arriving in the worker-pickup / compile window (before
/// the eval enters Wasm, where an epoch bump would be absorbed by
/// the not-yet-set deadline) is still observed by the session's
/// pre-start `check_interrupt`.
interrupt: InterruptHandle,
forms_tx: mpsc::UnboundedSender<String>,
/// Worker task drives `Session::handle_form` sequentially per
/// channel; held so the worker stays alive for the channel's
/// lifetime. Dropping `EvalChannelState` closes `forms_tx`, the
/// worker's `recv()` returns `None`, and the task exits cleanly.
_worker: tokio::task::JoinHandle<()>,
impl EvalChannelState {
pub fn new(ctx: ScriptCtx, sink: Box<dyn ResponseSink>) -> Result<Self, SessionError> {
let mut session = Session::new(ctx)?;
let bumper = session.epoch_bumper();
let interrupt = session.interrupt_handle();
let (forms_tx, mut forms_rx) = mpsc::unbounded_channel::<String>();
let worker = tokio::spawn(async move {
while let Some(frame) = forms_rx.recv().await {
let mut response = session.handle_form(&frame).await;
response.push('\n');
sink.send(response.into_bytes()).await;
});
Ok(Self {
decoder: FrameDecoder::new(),
bumper,
interrupt,
forms_tx,
_worker: worker,
})
/// Feeds raw bytes received on the channel. Scans for the ETX
/// interrupt token synchronously and, on finding one, signals the
/// cancel both ways: it advances the interrupt generation (caught
/// by `handle_form`'s pre-start `check_interrupt` if the eval has
/// not entered Wasm yet) and bumps the engine epoch (traps an eval
/// already running in `call_async`). Wiring both closes the
/// worker-pickup / compile window, where a lone epoch bump would be
/// absorbed by the not-yet-set deadline and the cancel lost. Either
/// path surfaces a `(:code interrupted ...)` envelope. ETX bytes are
/// stripped before the bytes reach the frame decoder.
///
/// Errors only for non-UTF8 input (the ssh layer should never
/// emit that for an s-expression subsystem).
pub fn feed(&mut self, bytes: &[u8]) -> Result<(), FrameError> {
let has_interrupt = bytes.contains(&ETX);
if has_interrupt {
self.interrupt.interrupt();
self.bumper.bump();
let cleaned: Vec<u8>;
let payload: &[u8] = if has_interrupt {
cleaned = bytes.iter().copied().filter(|b| *b != ETX).collect();
&cleaned
} else {
bytes
};
self.decoder.feed(payload)?;
loop {
match self.decoder.next_frame() {
None => break,
Some(Ok(frame)) => {
if self.forms_tx.send(frame).is_err() {
// Worker exited (channel drop or session
// error). Subsequent frames silently noop;
// the channel will be torn down by the SSH
// layer when the client notices the missing
// responses.
break;
Some(Err(err)) => {
return Err(err);
Ok(())
#[cfg(test)]
mod tests {
use super::*;
use rpc::ScriptLimits;
use sqlx::types::Uuid;
struct MpscSink {
tx: mpsc::UnboundedSender<Vec<u8>>,
impl ResponseSink for MpscSink {
let _ = self.tx.send(bytes);
fn new_state() -> (EvalChannelState, mpsc::UnboundedReceiver<Vec<u8>>) {
state_with_ctx(ScriptCtx::new(Uuid::new_v4()))
fn state_with_ctx(ctx: ScriptCtx) -> (EvalChannelState, mpsc::UnboundedReceiver<Vec<u8>>) {
let (tx, rx) = mpsc::unbounded_channel();
let sink = Box::new(MpscSink { tx });
let state = EvalChannelState::new(ctx, sink).expect("session init");
(state, rx)
/// Collects `expected` responses, awaiting each rather than
/// busy-polling a fixed budget so a slow CI runner never spuriously
/// yields fewer (a genuine hang still fails via the per-response
/// timeout). After the expected count arrives it keeps draining on a
/// short grace window, so an over-emission regression (an extra
/// frame) is collected and trips the caller's count assertion.
async fn drain_count(
rx: &mut mpsc::UnboundedReceiver<Vec<u8>>,
expected: usize,
) -> Vec<String> {
let mut out = Vec::new();
let budget = if out.len() < expected {
std::time::Duration::from_secs(10)
std::time::Duration::from_millis(100)
match tokio::time::timeout(budget, rx.recv()).await {
Ok(Some(b)) => out.push(String::from_utf8(b).unwrap()),
Ok(None) | Err(_) => break,
out
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn empty_feed_produces_no_responses() {
let (mut state, mut rx) = new_state();
state.feed(b"").unwrap();
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(rx.try_recv().is_err());
async fn complete_frame_yields_response_with_trailing_newline() {
state
.feed(b"(:id 1 :form (rpc-protocol-version))\n")
.unwrap();
let responses = drain_count(&mut rx, 1).await;
assert_eq!(responses.len(), 1, "got {responses:?}");
let r = &responses[0];
assert!(r.starts_with("(:id 1"), "{r:?}");
assert!(r.ends_with('\n'), "{r:?}");
async fn two_concatenated_frames_yield_two_responses() {
.feed(b"(:id 3 :form (rpc-protocol-version))\n(:id 4 :form (rpc-protocol-version))\n")
let responses = drain_count(&mut rx, 2).await;
assert_eq!(responses.len(), 2);
assert!(responses[0].contains(":id 3"));
assert!(responses[1].contains(":id 4"));
async fn etx_byte_in_stream_is_stripped() {
// Send `(form)<ETX>` — the ETX must NOT pollute the decoder
// (which would refuse the byte as non-paren content) and must
// NOT prevent the frame from completing.
let mut bytes = b"(:id 5 :form (rpc-protocol-version))\n".to_vec();
bytes.push(ETX);
state.feed(&bytes).unwrap();
assert!(responses[0].contains(":id 5"));
async fn etx_byte_alone_bumps_epoch_without_decoding_a_frame() {
// Just an ETX with no surrounding form should not produce
// any output — the bumper fires, but there's nothing to
// cancel since no eval is in flight.
state.feed(&[ETX]).unwrap();
async fn etx_byte_after_form_cancels_inflight_eval() {
// End-to-end cancel scenario through the eval-channel layer:
// feed a long-running form, then feed an ETX byte on the same
// channel. `feed` both bumps the epoch and advances the
// interrupt generation, so the cancel is observed whether the
// eval is already inside Wasm (epoch trap) or still in the
// worker-pickup / compile window (pre-start check_interrupt).
//
// Fuel is set effectively unbounded and the loop runs billions
// of iterations (the bound is just under i32::MAX, since the
// loop counter is an Index / i32), so the form can NEVER
// terminate on its own (neither a natural value nor a
// fuel-exhaustion `:code runtime`) within the test window.
// `:code interrupted` is therefore the only possible outcome —
// the assertion pins exactly that, which is what makes this a
// real regression guard for the cancel path rather than a test
// the fuel cap can satisfy on its own.
let ctx = ScriptCtx::new(Uuid::new_v4()).with_limits(ScriptLimits {
fuel: u64::MAX,
..ScriptLimits::default()
let (mut state, mut rx) = state_with_ctx(ctx);
.feed(b"(:id 9 :form (do ((i 0 (+ i 1))) ((>= i 2000000000) i)))\n")
// Yield so the worker picks the form up and starts running.
assert!(r.contains(":id 9"), "{r:?}");
assert!(r.contains(":code interrupted"), "{r:?}");
#[tokio::test(flavor = "current_thread")]
async fn feed_rejects_non_utf8() {
let (mut state, _rx) = new_state();
let invalid: [u8; 2] = [0xC0, 0xC1];
assert!(state.feed(&invalid).is_err());