1
//! SLYNK server: lets Emacs SLY (`M-x sly-connect`) drive nomiscript through a
2
//! real `rpc::Session`. Speaks the SLYNK wire protocol (length-prefixed s-expr
3
//! RPC) — the exact contract is captured in
4
//! `doc/editor/slynk-protocol-transcript.org`.
5
//!
6
//! Layers: [`frame`] (6-hex length framing), [`sexp`] (the wire s-expr
7
//! subset), [`events`] (outbound event constructors), [`dispatch`] (pure
8
//! request classification + reply mapping), and this server loop. Activated by
9
//! `nms --slynk-port <PORT>`.
10
//!
11
//! Concurrency: per connection a **reader task** owns the socket read half and
12
//! classifies frames; an interrupt frame bumps the engine epoch IMMEDIATELY
13
//! (the reader holds an `EpochBumper` clone) so `C-g` lands even while an eval
14
//! is in flight. Eval requests flow over an mpsc to the **main task**, which
15
//! owns the `Session` and the write half and processes them serially. A
16
//! blocking eval therefore never starves the interrupt path.
17

            
18
mod dispatch;
19
mod events;
20
mod frame;
21
mod sexp;
22

            
23
use anyhow::{Context, Result};
24
use rpc::{ScriptCtx, Session};
25
use tokio::io::{AsyncRead, AsyncWrite};
26
use tokio::net::TcpListener;
27
use tokio::sync::mpsc;
28
use uuid::Uuid;
29

            
30
use dispatch::Inbound;
31

            
32
/// Channel ids: SLY allocates the local channel; we mirror it as the remote id
33
/// and use a fixed thread id (single-session, no real threads).
34
const THREAD_ID: i64 = 1;
35

            
36
/// Inbound message queue depth. SLY pipelines a handful of frames at most; a
37
/// bound caps memory if a client floods `:process` frames behind a slow eval.
38
const INBOUND_QUEUE_DEPTH: usize = 64;
39

            
40
/// Runs the SLYNK server: binds `127.0.0.1:port` and serves connections one at
41
/// a time (a debug REPL is single-session), rebuilding a fresh `Session` per
42
/// connection so a reconnect starts clean. `user_id` selects the DB-backed
43
/// `rpc::Session` user (same as `--rpc-user`).
44
pub async fn serve(port: u16, user_id: Uuid) -> Result<()> {
45
    let listener = TcpListener::bind(("127.0.0.1", port))
46
        .await
47
        .with_context(|| format!("binding SLYNK listener on 127.0.0.1:{port}"))?;
48
    eprintln!("nms SLYNK server listening on 127.0.0.1:{port} (M-x sly-connect)");
49
    loop {
50
        let (sock, peer) = listener.accept().await.context("accepting connection")?;
51
        eprintln!("SLYNK: client connected from {peer}");
52
        let session = Session::new(ScriptCtx::new(user_id))
53
            .map_err(|e| anyhow::anyhow!("rpc::Session::new failed: {e:?}"))?;
54
        if let Err(e) = serve_connection(sock, session).await {
55
            eprintln!("SLYNK: connection ended: {e:#}");
56
        } else {
57
            eprintln!("SLYNK: client disconnected");
58
        }
59
    }
60
}
61

            
62
/// Serves one connection until the peer closes. Splits the socket: the reader
63
/// task feeds classified inbound messages to this task over `rx`; this task
64
/// owns the `Session` + the write half and drives the protocol.
65
async fn serve_connection<S>(sock: S, mut session: Session) -> Result<()>
66
where
67
    S: AsyncRead + AsyncWrite + Send + 'static,
68
{
69
    let (read_half, mut write_half) = tokio::io::split(sock);
70
    let bumper = session.epoch_bumper();
71
    let interrupt = session.interrupt_handle();
72
    // Bounded so a client can't enqueue unbounded large `:process` frames while
73
    // a slow eval runs (memory-DoS guard); full → the reader closes the conn.
74
    let (tx, mut rx) = mpsc::channel::<Inbound>(INBOUND_QUEUE_DEPTH);
75

            
76
    // Reader task: decode frames, classify, forward. Interrupts are acted on
77
    // here so they don't queue behind an in-flight eval: bump the engine epoch
78
    // (cancels a running Wasm call) AND arm the interrupt latch (catches an
79
    // interrupt that lands during cold compile / before the call starts).
80
    let reader = tokio::spawn(read_loop(read_half, tx, bumper, interrupt));
81

            
82
    let mut remote_channel = 1i64;
83
    while let Some(msg) = rx.recv().await {
84
        match msg {
85
            Inbound::ConnectionInfo { id } => {
86
                let pid = i64::from(std::process::id());
87
                send(
88
                    &mut write_half,
89
                    &events::return_ok(events::connection_info(pid), id),
90
                )
91
                .await?;
92
            }
93
            Inbound::AddLoadPaths { id } => {
94
                send(
95
                    &mut write_half,
96
                    &events::return_ok(sexp::Sexp::Symbol("nil".into()), id),
97
                )
98
                .await?;
99
            }
100
            Inbound::SlynkRequire { id } => {
101
                send(
102
                    &mut write_half,
103
                    &events::return_ok(events::require_modules(), id),
104
                )
105
                .await?;
106
            }
107
            Inbound::CreateMrepl { id, local_channel } => {
108
                remote_channel = local_channel;
109
                send(
110
                    &mut write_half,
111
                    &events::return_ok(events::create_mrepl_ok(remote_channel, THREAD_ID), id),
112
                )
113
                .await?;
114
                // The first prompt is pushed unsolicited to the local channel
115
                // (transcript-confirmed).
116
                send(&mut write_half, &events::prompt(local_channel)).await?;
117
            }
118
            Inbound::Process { channel, source } => {
119
                let outcome = session.handle_request(&source).await;
120
                for frame in dispatch::eval_reply(channel, &outcome) {
121
                    send(&mut write_half, &frame).await?;
122
                }
123
            }
124
            Inbound::LoadFile { id, path } => {
125
                // `slynk:load-file` is a plain rex (no mREPL channel), and SLY
126
                // has no top-level `:write-string` event — `sly-load-file`
127
                // renders the `:ok` value in its transcript. So captured output
128
                // is folded into the returned summary by `load_reply`, not sent
129
                // as a separate frame.
130
                let outcome = session.handle_file(&path).await;
131
                send(&mut write_half, &dispatch::load_reply(id, &outcome)).await?;
132
            }
133
            Inbound::Completions { id, prefix, flex } => {
134
                let names = session.completions(&prefix);
135
                send(
136
                    &mut write_half,
137
                    &dispatch::completion_reply(id, &prefix, flex, &names),
138
                )
139
                .await?;
140
            }
141
            Inbound::Ping { thread, tag } => {
142
                send(&mut write_half, &events::emacs_pong(thread, tag)).await?;
143
            }
144
            Inbound::Interrupt => {
145
                // Already acted on in the reader (epoch bump); nothing to send.
146
            }
147
            Inbound::AbortRex { id } => {
148
                send(&mut write_half, &events::return_abort("unimplemented", id)).await?;
149
            }
150
            Inbound::Ignore => {}
151
        }
152
    }
153
    let _ = remote_channel;
154
    reader.await.ok();
155
    Ok(())
156
}
157

            
158
/// Reads + classifies frames until EOF/error. Forwards each message to the
159
/// eval task; an `:emacs-interrupt` additionally bumps the epoch right here so
160
/// an in-flight eval is cancelled without waiting for the channel to drain.
161
async fn read_loop<R>(
162
    mut read_half: R,
163
    tx: mpsc::Sender<Inbound>,
164
    bumper: rpc::EpochBumper,
165
    interrupt: rpc::InterruptHandle,
166
) where
167
    R: AsyncRead + Unpin,
168
{
169
    loop {
170
        match frame::read_frame(&mut read_half).await {
171
            Ok(Some(raw)) => {
172
                let Ok(parsed) = sexp::parse(&raw) else {
173
                    // Malformed frame: skip it rather than tearing down the
174
                    // connection (matches SLY's tolerance; never panics).
175
                    continue;
176
                };
177
                let msg = dispatch::classify(&parsed);
178
                if matches!(msg, Inbound::Interrupt) {
179
                    // Cancel a running Wasm call (epoch) AND latch for an eval
180
                    // still in its pre-call compile/link phase (the `evaluate`
181
                    // start + pre-`call_async` re-check consume the latch).
182
                    bumper.bump();
183
                    interrupt.interrupt();
184
                }
185
                if tx.send(msg).await.is_err() {
186
                    break; // eval task gone
187
                }
188
            }
189
            Ok(None) => break, // clean EOF
190
            Err(_) => break,   // malformed framing / read error: end connection
191
        }
192
    }
193
}
194

            
195
async fn send<W>(writer: &mut W, payload: &str) -> Result<()>
196
where
197
    W: AsyncWrite + Unpin,
198
{
199
    frame::write_frame(writer, payload)
200
        .await
201
        .context("writing SLYNK frame")
202
}