Files
SimpleRemoter/server/go/web/ws.go

367 lines
10 KiB
Go

package web
import (
"encoding/json"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/yuanyuanxiang/SimpleRemoter/server/go/hub"
"github.com/yuanyuanxiang/SimpleRemoter/server/go/logger"
"github.com/yuanyuanxiang/SimpleRemoter/server/go/wsauth"
)
// ----- WS framing knobs ---------------------------------------------------
const (
wsWriteWait = 10 * time.Second // single-frame write deadline
wsReadLimit = 1 << 20 // refuse incoming frames over 1 MB
wsSendBuffer = 64 // outbound queue depth per client
)
// upgrader allows any origin — this service is meant to be tunneled through
// frp, so requests can legitimately arrive from arbitrary front-end hosts.
// Adjust CheckOrigin once we have a deployment story.
var upgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool { return true },
}
// ----- per-connection client state ----------------------------------------
// wsMsg is one queued WebSocket frame. binary toggles between
// websocket.TextMessage (JSON signaling) and websocket.BinaryMessage
// (screen frames).
type wsMsg struct {
binary bool
data []byte
}
type wsClient struct {
conn *websocket.Conn
send chan wsMsg
closed chan struct{}
once sync.Once
// Mutated under wsHub.mu (or only by the read loop owning this client).
nonce string // outstanding challenge — cleared after a successful login
token string // set once authenticated
role string // mirrors session role after login
addr string // client address for logs
watching string // device ID this browser is currently streaming, "" when on the list
termWatching string // device ID for an open web terminal session, "" otherwise
}
// queue writes a JSON text frame onto the send buffer. Drops silently if the
// buffer is full so a stuck reader can't back-pressure the broadcast path.
func (c *wsClient) queue(payload []byte) {
c.enqueue(wsMsg{binary: false, data: payload})
}
// queueBinary writes a binary WS frame. Used for screen-stream packets.
func (c *wsClient) queueBinary(payload []byte) {
c.enqueue(wsMsg{binary: true, data: payload})
}
func (c *wsClient) enqueue(m wsMsg) {
select {
case c.send <- m:
case <-c.closed:
default:
// queue full — drop (acceptable for video; signaling clients are
// typically not behind enough for the small text buffer to fill).
}
}
// close signals both loops to exit. Safe to call multiple times.
func (c *wsClient) close() {
c.once.Do(func() {
close(c.closed)
_ = c.conn.Close()
})
}
// ----- ws hub: registry of all connected browsers -------------------------
type wsHub struct {
auth *wsauth.Authenticator
devices *hub.Hub
log *logger.Logger
mu sync.RWMutex
clients map[*wsClient]struct{}
unsub func()
}
func newWSHub(auth *wsauth.Authenticator, devices *hub.Hub, log *logger.Logger) *wsHub {
h := &wsHub{
auth: auth,
devices: devices,
log: log,
clients: make(map[*wsClient]struct{}),
}
h.unsub = devices.Subscribe(h)
return h
}
// stop unsubscribes from the device hub. Existing connections keep running
// until they close on their own; we only block new event delivery.
func (h *wsHub) stop() {
if h.unsub != nil {
h.unsub()
h.unsub = nil
}
}
// hub.EventHandler — invoked from hub.Register / hub.Unregister.
func (h *wsHub) OnDeviceOnline(_ hub.DeviceInfo) {
h.broadcastAuthenticated(`{"cmd":"devices_changed"}`)
}
func (h *wsHub) OnDeviceOffline(_ string) {
h.broadcastAuthenticated(`{"cmd":"devices_changed"}`)
}
// OnCursorChange relays the remote cursor index to every viewer of this
// device. The browser maps the index to a CSS cursor (desktop) or overlay
// SVG variant (touch). Hub already de-duplicates so we always have a real
// transition here.
func (h *wsHub) OnCursorChange(deviceID string, index byte) {
msg := mustJSON(map[string]any{
"cmd": "cursor",
"index": index,
})
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.watching == deviceID && c.token != "" {
c.queue(msg)
}
}
}
// OnResolutionChange notifies viewers so the browser-side WebCodecs decoder
// can be (re)initialized with the right frame size. Without this, incoming
// binary frames after connect_result are decoded by an uninitialized
// VideoDecoder and the page stays on "Waiting for video...".
func (h *wsHub) OnResolutionChange(deviceID string, width, height int) {
msg := mustJSON(map[string]any{
"cmd": "resolution_changed",
"id": deviceID,
"width": width,
"height": height,
})
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.watching == deviceID && c.token != "" {
c.queue(msg)
}
}
}
// OnScreenFrame ships a screen packet to every browser currently watching
// this device. We hold the read lock for the whole iteration, but each
// queueBinary is non-blocking (drops on backpressure) so a slow viewer
// cannot stall the fast ones.
func (h *wsHub) OnScreenFrame(deviceID string, packet []byte, _ bool) {
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.watching == deviceID && c.token != "" {
c.queueBinary(packet)
}
}
}
// OnTerminalReady notifies the requesting browser that its term_open
// handshake completed. mode is "pty" or "legacy" — xterm.js disables the
// resize callback in legacy mode (no PTY behind the cmd pipe).
func (h *wsHub) OnTerminalReady(deviceID string, isPTY bool) {
mode := "legacy"
if isPTY {
mode = "pty"
}
msg := mustJSON(map[string]any{
"cmd": "term_ready",
"id": deviceID,
"mode": mode,
})
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.termWatching == deviceID && c.token != "" {
c.queue(msg)
}
}
}
// OnTerminalData ships one chunk of raw shell output (already wrapped in
// the "TRM1" magic header) over the binary WS frame. Single-viewer is
// enforced upstream so at most one client matches per device.
func (h *wsHub) OnTerminalData(deviceID string, packet []byte) {
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.termWatching == deviceID && c.token != "" {
c.queueBinary(packet)
}
}
}
// OnTerminalClosed fires when the device's shell exits or the sub-conn
// drops. The browser closes its xterm panel. We also clear termWatching
// so a subsequent term_open from the same browser isn't rejected as
// "already open" by stale state.
func (h *wsHub) OnTerminalClosed(deviceID string, reason string) {
msg := mustJSON(map[string]any{
"cmd": "term_closed",
"ok": true,
"reason": reason,
})
h.mu.Lock()
defer h.mu.Unlock()
for c := range h.clients {
if c.termWatching == deviceID && c.token != "" {
c.termWatching = ""
c.queue(msg)
}
}
}
// OnDeviceUpdate forwards heartbeat-derived liveness data so the device-list
// rows can refresh RTT and active-window labels without re-fetching.
func (h *wsHub) OnDeviceUpdate(id string, rtt int, activeWindow string) {
payload := mustJSON(map[string]any{
"cmd": "device_update",
"id": id,
"rtt": rtt,
"activeWindow": activeWindow,
})
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.token != "" {
c.queue(payload)
}
}
}
func (h *wsHub) broadcastAuthenticated(msg string) {
payload := []byte(msg)
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.token != "" {
c.queue(payload)
}
}
}
func (h *wsHub) register(c *wsClient) {
h.mu.Lock()
h.clients[c] = struct{}{}
h.mu.Unlock()
}
func (h *wsHub) unregister(c *wsClient) {
h.mu.Lock()
delete(h.clients, c)
h.mu.Unlock()
// If this client was the last viewer of a device, tear down the screen
// session so the device stops encoding. Done OUTSIDE the lock so the
// hub's mutators can take their own locks without risk of recursion.
if c.watching != "" && h.countWatchers(c.watching) == 0 {
h.devices.CloseScreen(c.watching)
}
// Terminal sessions are single-viewer by design, so any open session
// belongs to this client. Tear it down so the next viewer doesn't
// hit ErrTerminalBusy from an abandoned session.
if c.termWatching != "" {
h.devices.CloseTerminalSession(c.termWatching)
c.termWatching = ""
}
// Do NOT revoke the token: tokens are session-scoped, not WS-scoped.
// Frontend may close+reopen the WS at any time (visibilitychange handler,
// brief network blip, reload) and must be able to resume with the same
// cached token. The token expires on its own TTL.
c.close()
}
// ----- HTTP handler -------------------------------------------------------
func (h *wsHub) serve(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
h.log.Error("ws upgrade: %v", err)
return
}
conn.SetReadLimit(wsReadLimit)
nonce, err := wsauth.NewNonce()
if err != nil {
h.log.Error("nonce gen: %v", err)
_ = conn.Close()
return
}
client := &wsClient{
conn: conn,
send: make(chan wsMsg, wsSendBuffer),
closed: make(chan struct{}),
nonce: nonce,
addr: r.RemoteAddr,
}
h.register(client)
defer h.unregister(client)
go h.writeLoop(client)
// Greet with a challenge nonce so the browser can compute the login response.
client.queue([]byte(`{"cmd":"challenge","nonce":"` + nonce + `"}`))
h.readLoop(client)
}
// writeLoop drains the send queue. Exits when the channel is closed or a
// write fails. Closing the underlying connection is the read loop's job.
func (h *wsHub) writeLoop(c *wsClient) {
for {
select {
case msg := <-c.send:
msgType := websocket.TextMessage
if msg.binary {
msgType = websocket.BinaryMessage
}
_ = c.conn.SetWriteDeadline(time.Now().Add(wsWriteWait))
if err := c.conn.WriteMessage(msgType, msg.data); err != nil {
c.close()
return
}
case <-c.closed:
return
}
}
}
// readLoop dispatches incoming messages. Exits on read error (peer closed,
// timeout, malformed frame, etc.), which then triggers unregister cleanup.
func (h *wsHub) readLoop(c *wsClient) {
for {
_, raw, err := c.conn.ReadMessage()
if err != nil {
return
}
var env struct {
Cmd string `json:"cmd"`
}
if err := json.Unmarshal(raw, &env); err != nil {
continue // ignore garbage frames
}
h.dispatch(c, env.Cmd, raw)
}
}