Lines
0 %
Functions
Branches
100 %
//! SLYNK server: lets Emacs SLY (`M-x sly-connect`) drive nomiscript through a
//! real `rpc::Session`. Speaks the SLYNK wire protocol (length-prefixed s-expr
//! RPC) — the exact contract is captured in
//! `doc/editor/slynk-protocol-transcript.org`.
//!
//! Layers: [`frame`] (6-hex length framing), [`sexp`] (the wire s-expr
//! subset), [`events`] (outbound event constructors), [`dispatch`] (pure
//! request classification + reply mapping), and this server loop. Activated by
//! `nms --slynk-port <PORT>`.
//! Concurrency: per connection a **reader task** owns the socket read half and
//! classifies frames; an interrupt frame bumps the engine epoch IMMEDIATELY
//! (the reader holds an `EpochBumper` clone) so `C-g` lands even while an eval
//! is in flight. Eval requests flow over an mpsc to the **main task**, which
//! owns the `Session` and the write half and processes them serially. A
//! blocking eval therefore never starves the interrupt path.
mod dispatch;
mod events;
mod frame;
mod sexp;
use anyhow::{Context, Result};
use rpc::{ScriptCtx, Session};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use uuid::Uuid;
use dispatch::Inbound;
/// Channel ids: SLY allocates the local channel; we mirror it as the remote id
/// and use a fixed thread id (single-session, no real threads).
const THREAD_ID: i64 = 1;
/// Inbound message queue depth. SLY pipelines a handful of frames at most; a
/// bound caps memory if a client floods `:process` frames behind a slow eval.
const INBOUND_QUEUE_DEPTH: usize = 64;
/// Runs the SLYNK server: binds `127.0.0.1:port` and serves connections one at
/// a time (a debug REPL is single-session), rebuilding a fresh `Session` per
/// connection so a reconnect starts clean. `user_id` selects the DB-backed
/// `rpc::Session` user (same as `--rpc-user`).
pub async fn serve(port: u16, user_id: Uuid) -> Result<()> {
let listener = TcpListener::bind(("127.0.0.1", port))
.await
.with_context(|| format!("binding SLYNK listener on 127.0.0.1:{port}"))?;
eprintln!("nms SLYNK server listening on 127.0.0.1:{port} (M-x sly-connect)");
loop {
let (sock, peer) = listener.accept().await.context("accepting connection")?;
eprintln!("SLYNK: client connected from {peer}");
let session = Session::new(ScriptCtx::new(user_id))
.map_err(|e| anyhow::anyhow!("rpc::Session::new failed: {e:?}"))?;
if let Err(e) = serve_connection(sock, session).await {
eprintln!("SLYNK: connection ended: {e:#}");
} else {
eprintln!("SLYNK: client disconnected");
}
/// Serves one connection until the peer closes. Splits the socket: the reader
/// task feeds classified inbound messages to this task over `rx`; this task
/// owns the `Session` + the write half and drives the protocol.
async fn serve_connection<S>(sock: S, mut session: Session) -> Result<()>
where
S: AsyncRead + AsyncWrite + Send + 'static,
{
let (read_half, mut write_half) = tokio::io::split(sock);
let bumper = session.epoch_bumper();
let interrupt = session.interrupt_handle();
// Bounded so a client can't enqueue unbounded large `:process` frames while
// a slow eval runs (memory-DoS guard); full → the reader closes the conn.
let (tx, mut rx) = mpsc::channel::<Inbound>(INBOUND_QUEUE_DEPTH);
// Reader task: decode frames, classify, forward. Interrupts are acted on
// here so they don't queue behind an in-flight eval: bump the engine epoch
// (cancels a running Wasm call) AND arm the interrupt latch (catches an
// interrupt that lands during cold compile / before the call starts).
let reader = tokio::spawn(read_loop(read_half, tx, bumper, interrupt));
let mut remote_channel = 1i64;
while let Some(msg) = rx.recv().await {
match msg {
Inbound::ConnectionInfo { id } => {
let pid = i64::from(std::process::id());
send(
&mut write_half,
&events::return_ok(events::connection_info(pid), id),
)
.await?;
Inbound::AddLoadPaths { id } => {
&events::return_ok(sexp::Sexp::Symbol("nil".into()), id),
Inbound::SlynkRequire { id } => {
&events::return_ok(events::require_modules(), id),
Inbound::CreateMrepl { id, local_channel } => {
remote_channel = local_channel;
&events::return_ok(events::create_mrepl_ok(remote_channel, THREAD_ID), id),
// The first prompt is pushed unsolicited to the local channel
// (transcript-confirmed).
send(&mut write_half, &events::prompt(local_channel)).await?;
Inbound::Process { channel, source } => {
let outcome = session.handle_request(&source).await;
for frame in dispatch::eval_reply(channel, &outcome) {
send(&mut write_half, &frame).await?;
Inbound::LoadFile { id, path } => {
// `slynk:load-file` is a plain rex (no mREPL channel), and SLY
// has no top-level `:write-string` event — `sly-load-file`
// renders the `:ok` value in its transcript. So captured output
// is folded into the returned summary by `load_reply`, not sent
// as a separate frame.
let outcome = session.handle_file(&path).await;
send(&mut write_half, &dispatch::load_reply(id, &outcome)).await?;
Inbound::Completions { id, prefix, flex } => {
let names = session.completions(&prefix);
&dispatch::completion_reply(id, &prefix, flex, &names),
Inbound::Ping { thread, tag } => {
send(&mut write_half, &events::emacs_pong(thread, tag)).await?;
Inbound::Interrupt => {
// Already acted on in the reader (epoch bump); nothing to send.
Inbound::AbortRex { id } => {
send(&mut write_half, &events::return_abort("unimplemented", id)).await?;
Inbound::Ignore => {}
let _ = remote_channel;
reader.await.ok();
Ok(())
/// Reads + classifies frames until EOF/error. Forwards each message to the
/// eval task; an `:emacs-interrupt` additionally bumps the epoch right here so
/// an in-flight eval is cancelled without waiting for the channel to drain.
async fn read_loop<R>(
mut read_half: R,
tx: mpsc::Sender<Inbound>,
bumper: rpc::EpochBumper,
interrupt: rpc::InterruptHandle,
) where
R: AsyncRead + Unpin,
match frame::read_frame(&mut read_half).await {
Ok(Some(raw)) => {
let Ok(parsed) = sexp::parse(&raw) else {
// Malformed frame: skip it rather than tearing down the
// connection (matches SLY's tolerance; never panics).
continue;
};
let msg = dispatch::classify(&parsed);
if matches!(msg, Inbound::Interrupt) {
// Cancel a running Wasm call (epoch) AND latch for an eval
// still in its pre-call compile/link phase (the `evaluate`
// start + pre-`call_async` re-check consume the latch).
bumper.bump();
interrupt.interrupt();
if tx.send(msg).await.is_err() {
break; // eval task gone
Ok(None) => break, // clean EOF
Err(_) => break, // malformed framing / read error: end connection
async fn send<W>(writer: &mut W, payload: &str) -> Result<()>
W: AsyncWrite + Unpin,
frame::write_frame(writer, payload)
.context("writing SLYNK frame")