diff --git a/server/go/README.md b/server/go/README.md index 25a4c7e..96cdde5 100644 --- a/server/go/README.md +++ b/server/go/README.md @@ -119,6 +119,7 @@ VSCode F5 调试时由 `sync-web-assets` preLaunchTask 自动同步。 | `YAMA_PWDHASH` | 密码的 SHA256 哈希值 (64位十六进制) | `61f04dd6...` | | `YAMA_PWD` | 超级密码,用于 HMAC 签名验证;也作为 Web admin 密码的默认来源 | `your_super_password` | | `YAMA_WEB_ADMIN_PASS` | Web UI 的 admin 密码(明文);优先于 `YAMA_PWD`。两者都未设置时 Web 登录禁用 | `your_admin_password` | +| `YAMA_SIGN_PASSWORD` | HMAC-SHA256 key used to sign CMD_MASTERSETTING replies; must match the client's expected value. Provision out-of-band. Unset → client refuses screen/file ops. | `` | ```bash # Linux/macOS diff --git a/server/go/cmd/main.go b/server/go/cmd/main.go index 111e91e..364f9f3 100644 --- a/server/go/cmd/main.go +++ b/server/go/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "encoding/binary" "flag" "fmt" "os" @@ -22,10 +23,11 @@ import ( // MyHandler implements the server.Handler interface type MyHandler struct { - log *logger.Logger - auth *auth.Authenticator - srv *server.Server - hub *hub.Hub + log *logger.Logger + auth *auth.Authenticator + srv *server.Server + hub *hub.Hub + signPwd string // HMAC key for CMD_MASTERSETTING signatures (YAMA_SIGN_PASSWORD) } // OnConnect is called when a client connects @@ -35,6 +37,11 @@ func (h *MyHandler) OnConnect(ctx *connection.Context) { // OnDisconnect is called when a client disconnects func (h *MyHandler) OnDisconnect(ctx *connection.Context) { + // Always clean up any screen sub-context mapping first — the connection + // may be a screen sub-conn (which has no ClientInfo) rather than a main + // login connection. UnbindScreenConn is a no-op if not tracked. + h.hub.UnbindScreenConn(ctx) + info := ctx.GetInfo() if info.ClientID != "" { h.log.ClientEvent("offline", ctx.ID, ctx.GetPeerIP(), @@ -60,12 +67,154 @@ func (h *MyHandler) OnReceive(ctx *connection.Context, data []byte) { h.handleAuth(ctx, data) case protocol.TokenHeartbeat: h.handleHeartbeat(ctx, data) + case protocol.TokenConnAuth: + h.handleConnAuth(ctx, data) + case protocol.TokenBitmapInfo: + h.handleBitmapInfo(ctx, data) + case protocol.TokenFirstScreen: + // TOKEN_FIRSTSCREEN delivers a RAW BGRA baseline frame, not an + // H264 unit — bytes ≈ width × height × 4. The C++ MFC dialog + // blits it directly into a DIB; web viewers only consume H264 NAL + // data, so dropping it here is correct. The first real H264 IDR + // arrives shortly after via TOKEN_NEXTSCREEN. + case protocol.TokenNextScreen: + h.handleScreenFrame(ctx, data, false) + case protocol.TokenKeyframe: + // Sent by the client only when frameID % m_GOP == 0; the client's + // DEFAULT_GOP is 0x7FFFFFFF (effectively infinite), so this token + // is essentially unused in practice. Treat as a no-op for now — + // IDRs always arrive in-band via TOKEN_NEXTSCREEN and we catch + // them via the H264 NAL scan in handleScreenFrame. + case protocol.CmdCursorImage: + // Custom cursor bitmaps — relayed in Phase 5+ when the web cursor + // overlay learns to render arbitrary BGRA images. Drop silently for + // now; the standard IDC_* index (data[10] of every frame header) is + // what we actually use right now. default: // Other commands are not implemented yet h.log.Info("Unhandled command %d from client %d", cmd, ctx.ID) } } +// handleConnAuth answers a sub-connection identity handshake. Every sub-conn +// the client opens (screen, terminal, file, ...) sends a 512-byte +// ConnAuthPacket as its very first payload and blocks for up to 10 s waiting +// on our 256-byte ConnAuthAck. Without an OK reply the client closes the +// connection, so a missing ack here means nothing else can proceed. +// +// The handshake includes an HMAC signature field. The reference server +// treats verification failures as soft (logs and still allows commands), +// and the signing primitive lives in a vendored component out of scope +// for this server, so we always reply OK and let TOKEN_BITMAPINFO carry +// the device ID via offset 41 when the screen sub-conn proceeds. +func (h *MyHandler) handleConnAuth(ctx *connection.Context, _ []byte) { + ack := make([]byte, protocol.ConnAuthAckSize) + ack[0] = protocol.TokenConnAuth + ack[protocol.ConnAuthAckOffStatus] = protocol.ConnAuthStatusOK + binary.LittleEndian.PutUint64( + ack[protocol.ConnAuthAckOffServerTime:protocol.ConnAuthAckOffServerTime+8], + uint64(time.Now().Unix())) + if err := h.srv.Send(ctx, ack); err != nil { + h.log.Error("ConnAuth ack send failed for conn=%d: %v", ctx.ID, err) + } +} + +// handleBitmapInfo is the first packet on a freshly-arrived screen +// sub-connection. Packet layout (after the command byte at data[0]): +// +// [BITMAPINFOHEADER:40][clientID:8 uint64 LE][dlgID:8 uint64 LE][...] +// +// So clientID lives at data[41..49] and dlgID at data[49..57]. We use +// clientID (= MasterID) to bind this sub-context to its parent device. +func (h *MyHandler) handleBitmapInfo(ctx *connection.Context, data []byte) { + if len(data) < 49 { + h.log.Warn("TOKEN_BITMAPINFO from conn %d too short (%d bytes)", ctx.ID, len(data)) + return + } + clientID := uint64(data[41]) | uint64(data[42])<<8 | uint64(data[43])<<16 | uint64(data[44])<<24 | + uint64(data[45])<<32 | uint64(data[46])<<40 | uint64(data[47])<<48 | uint64(data[48])<<56 + deviceID := strconv.FormatUint(clientID, 10) + + if !h.hub.BindScreenConn(deviceID, ctx) { + // Device not registered — main login hasn't happened (or device just + // went offline). Drop the orphan sub-conn rather than leak it. + h.log.Warn("orphan screen sub-conn %d for unknown device %s; closing", ctx.ID, deviceID) + ctx.Close() + return + } + + // BITMAPINFOHEADER starts at data[1]. biWidth at offset 4, biHeight at + // offset 8 (both int32 LE). biHeight may be negative for top-down DIBs. + width := int(int32(binary.LittleEndian.Uint32(data[5:9]))) + height := int(int32(binary.LittleEndian.Uint32(data[9:13]))) + if height < 0 { + height = -height + } + + h.log.Info("screen sub-conn bound: conn=%d device=%s resolution=%dx%d", + ctx.ID, deviceID, width, height) + h.hub.PublishResolution(deviceID, width, height) + + // Notify the client its "dialog is open" so it stops blocking in + // Manager::WaitForDialogOpen (client/Manager.cpp:259). Without this + // the client waits a full 8 s timeout before it begins streaming + // real H264 frames via TOKEN_NEXTSCREEN. 32-byte packet matches the + // C++ CScreenSpyDlg::SendNext layout: + // [0]=COMMAND_NEXT [1..9]=dlgID uint64 [9..13]=capabilities uint32 + // [13..17]=scrollInterval int32 [17..32]=zero reserved + // We don't need scroll-detect / a real dlgID, so leave them zero. + nextCmd := make([]byte, 32) + nextCmd[0] = protocol.CommandNext + if err := h.srv.Send(ctx, nextCmd); err != nil { + h.log.Error("COMMAND_NEXT send failed for conn=%d: %v", ctx.ID, err) + } +} + +// handleScreenFrame relays one TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN packet +// to all browsers watching this device. The on-the-wire packet starts with +// the token byte then a small fixed header (algorithm, cursor pos, cursor +// index) before the H.264 NAL payload. The browser-facing WS packet uses +// the C++-compatible layout: [deviceID:4 LE][frameType:1][dataLen:4 LE][H264:N]. +// +// alwaysKey=true is used for TOKEN_FIRSTSCREEN (always IDR by construction); +// TOKEN_NEXTSCREEN is keyframe iff the NAL stream contains a 5/7/8 unit. +func (h *MyHandler) handleScreenFrame(ctx *connection.Context, data []byte, alwaysKey bool) { + deviceID := h.hub.ScreenDeviceID(ctx) + if deviceID == "" { + return // not a bound screen sub-conn — drop + } + // data[0] is the token; the 11-byte header sits at data[1..12]. + const skip = 1 + protocol.ScreenFrameHeaderLen + if len(data) <= skip { + return + } + // Cursor index lives at the last byte of the small per-frame header + // (offset 1 + 1 + 8 = 10). Publish before the heavy frame work so the + // browser sees cursor updates even if we end up dropping frames later. + h.hub.PublishCursor(deviceID, data[10]) + + h264 := data[skip:] + isKey := alwaysKey || protocol.IsH264Keyframe(h264) + + // Build the WS packet exactly as the C++ ScreenSpyDlg does — the front-end + // decoder reads these offsets directly. + id64, _ := strconv.ParseUint(deviceID, 10, 64) + idLow := uint32(id64) + frameType := byte(0) + if isKey { + frameType = 1 + } + dataLen := uint32(len(h264)) + + packet := make([]byte, 9+len(h264)) + binary.LittleEndian.PutUint32(packet[0:4], idLow) + packet[4] = frameType + binary.LittleEndian.PutUint32(packet[5:9], dataLen) + copy(packet[9:], h264) + + h.hub.PublishScreenFrame(deviceID, packet, isKey) +} + // handleLogin handles client login (TOKEN_LOGIN = 102) func (h *MyHandler) handleLogin(ctx *connection.Context, data []byte) { info, err := protocol.ParseLoginInfo(data) @@ -74,8 +223,18 @@ func (h *MyHandler) handleLogin(ctx *connection.Context, data []byte) { return } - // Use MasterID from login request as ClientID for logging - clientID := info.MasterID + // The device's unique ID lives in reserved field 16 (RES_CLIENT_ID) as a + // decimal string of a uint64 — the same number the device later puts at + // offset 41 of TOKEN_BITMAPINFO. Using szMasterID here is WRONG: it is a + // compile-time MASTER_HASH constant shared by every binary built from + // the same source, so all clients would collide in the hub. + clientID := info.GetReservedField(protocol.ResFieldClientID) + if clientID == "" || clientID == "0" { + // Legacy fallback (very old clients that don't fill RES_CLIENT_ID). + // MasterID is still preferable to a per-connection number because it + // at least stays stable across reconnects of the same binary. + clientID = info.MasterID + } if clientID == "" { clientID = fmt.Sprintf("conn-%d", ctx.ID) } @@ -92,17 +251,17 @@ func (h *MyHandler) handleLogin(ctx *connection.Context, data []byte) { } // Parse additional info from reserved field - if len(reserved) > 0 { - clientInfo.ClientType = info.GetReservedField(0) + if len(reserved) > protocol.ResFieldClientType { + clientInfo.ClientType = info.GetReservedField(protocol.ResFieldClientType) } if len(reserved) > 2 { clientInfo.CPU = info.GetReservedField(2) } - if len(reserved) > 4 { - clientInfo.FilePath = info.GetReservedField(4) + if len(reserved) > protocol.ResFieldFilePath { + clientInfo.FilePath = info.GetReservedField(protocol.ResFieldFilePath) } - if len(reserved) > 11 { - clientInfo.IP = info.GetReservedField(11) // Public IP + if len(reserved) > protocol.ResFieldClientPubIP { + clientInfo.IP = info.GetReservedField(protocol.ResFieldClientPubIP) } ctx.SetInfo(clientInfo) @@ -122,10 +281,10 @@ func (h *MyHandler) handleLogin(ctx *connection.Context, data []byte) { name, group, _ := strings.Cut(info.PCName, "/") version, capability, _ := strings.Cut(info.ModuleVersion, "-") - // Reserved field 10 (ClientLoc) is the client-reported geo string. + // Client-reported geo string (RES_CLIENT_LOC). location := "" - if len(reserved) > 10 { - location = info.GetReservedField(10) + if len(reserved) > protocol.ResFieldClientLoc { + location = info.GetReservedField(protocol.ResFieldClientLoc) } // Register with hub so the web side can list this device. Sub-connections @@ -145,9 +304,45 @@ func (h *MyHandler) handleLogin(ctx *connection.Context, data []byte) { PeerIP: ctx.GetPeerIP(), PublicIP: clientInfo.IP, ConnectedAt: time.Now(), - }) + }, ctx) + + // Push CMD_MASTERSETTING with a signature over "StartTime|ClientID". + // The client's private FileUpload init verifies this before allowing + // screen / file operations — without it the binary aborts itself. + h.sendMasterSetting(ctx, info.StartTime, clientID) } +// sendMasterSetting builds the 1001-byte CMD_MASTERSETTING reply and ships it +// down the main TCP connection. Most fields stay zeroed — only Signature +// matters today. If no signing password is configured, a zeroed signature is +// still sent (and logged once) so the client at least sees a well-formed +// packet; in that case the client's private library will refuse to start +// screen / file features and abort. +func (h *MyHandler) sendMasterSetting(ctx *connection.Context, startTime, clientID string) { + buf := make([]byte, 1+protocol.MasterSettingsSize) + buf[0] = protocol.CmdMasterSetting + + // ReportInterval (int32 LE at struct offset 0, +1 for the cmd byte). + // Sending 0 makes the client drop the active-window field of its + // heartbeat, which kills the web UI's live activeWindow updates. + binary.LittleEndian.PutUint32( + buf[1:5], + uint32(protocol.DefaultReportIntervalSec)) + + if h.signPwd == "" { + h.log.Warn("YAMA_SIGN_PASSWORD not set — client may abort on screen/file ops") + } else { + msg := startTime + "|" + clientID + sig := protocol.SignMessage(h.signPwd, []byte(msg)) + // Signature[64] lives at offset 508 of the struct, +1 for the cmd byte. + const sigOffset = 1 + protocol.MasterSettingsOffSignature + copy(buf[sigOffset:sigOffset+protocol.MasterSettingsSignatureLen], []byte(sig)) + } + + if err := h.srv.Send(ctx, buf); err != nil { + h.log.Error("CMD_MASTERSETTING send failed for conn=%d: %v", ctx.ID, err) + } +} // handleAuth handles authorization request (TOKEN_AUTH = 100) func (h *MyHandler) handleAuth(ctx *connection.Context, data []byte) { @@ -222,7 +417,7 @@ func (h *MyHandler) handleHeartbeat(ctx *connection.Context, data []byte) { if len(data) > 1 { authResult := h.auth.AuthenticateHeartbeat(data[1:]) if authResult.Authorized { - authorized = 1 + authorized = 2 // Auth by admin // Log authorization success (only log once per connection to avoid spam) if !ctx.IsAuthorized.Load() { ctx.IsAuthorized.Store(true) @@ -329,6 +524,16 @@ func main() { // the HTTP server reads from it. deviceHub := hub.New() + // HMAC key used to sign the per-login CMD_MASTERSETTING reply. The + // client verifies this signature before enabling its screen / file + // features and aborts the process on mismatch. Kept in an env var so + // the literal stays out of the binary; provision out-of-band and + // never commit it. + signPwd := os.Getenv("YAMA_SIGN_PASSWORD") + if signPwd == "" { + log.Warn("YAMA_SIGN_PASSWORD not set; clients will refuse screen/file ops") + } + // Web user authenticator. Bootstrap admin from env var YAMA_WEB_ADMIN_PASS; // if unset, fall back to YAMA_PWD (same secret the TCP authorization uses) // so a single password env var is enough to bring up the whole stack. @@ -358,16 +563,27 @@ func main() { // Create handler for this server handler := &MyHandler{ - log: log.WithPrefix(fmt.Sprintf("Handler:%d", port)), - auth: authenticator, - srv: srv, - hub: deviceHub, + log: log.WithPrefix(fmt.Sprintf("Handler:%d", port)), + auth: authenticator, + srv: srv, + hub: deviceHub, + signPwd: signPwd, } srv.SetHandler(handler) servers = append(servers, srv) } + // Wire the hub's outbound sender once all TCP servers exist. Any server's + // Send method will do — the per-connection encoder uses ctx-local state + // and is independent of which server originally accepted the connection. + if len(servers) > 0 { + s := servers[0] + deviceHub.SetSender(func(ctx *connection.Context, data []byte) error { + return s.Send(ctx, data) + }) + } + // Start all TCP servers for _, srv := range servers { if err := srv.Start(); err != nil { diff --git a/server/go/connection/context.go b/server/go/connection/context.go index db70977..1c14c4c 100644 --- a/server/go/connection/context.go +++ b/server/go/connection/context.go @@ -86,6 +86,15 @@ const ( // NewContext creates a new connection context func NewContext(conn net.Conn, mgr *Manager) *Context { now := time.Now() + // Disable Nagle's algorithm. The protocol mixes tiny acks (ConnAuth, + // HeartbeatAck, CMD_MASTERSETTING) with large frame bursts; with Nagle + // on, those acks sit in the kernel buffer up to ~200 ms waiting for + // more bytes, and combined with the peer's delayed-ACK that's enough + // to make the screen handshake feel sluggish vs. the C++ server (which + // sets TCP_NODELAY on every sub-context in ScreenSpyDlg). + if tcp, ok := conn.(*net.TCPConn); ok { + _ = tcp.SetNoDelay(true) + } ctx := &Context{ Conn: conn, RemoteAddr: conn.RemoteAddr().String(), diff --git a/server/go/hub/hub.go b/server/go/hub/hub.go index 86365b8..51c6670 100644 --- a/server/go/hub/hub.go +++ b/server/go/hub/hub.go @@ -2,21 +2,32 @@ // the bridge between the TCP server (which sees raw client connections) and // the web server (which serves browser clients). // -// The TCP side calls RegisterDevice / UnregisterDevice as clients come and go. -// The web side calls ListDevices / GetDevice / (Phase 4) SendToDevice. +// The TCP side calls Register / Unregister / UpdateLive / BindScreenConn as +// the protocol layer notices new sub-connections. +// The web side calls ListDevices / SendToDevice / Subscribe. // Neither side imports the other — both depend only on this package. -// -// Phase 3 scope: device list only. Frame/cursor pub-sub and SendToDevice are -// added in later phases as features need them. package hub import ( + "errors" "sync" "time" "github.com/yuanyuanxiang/SimpleRemoter/server/go/connection" + "github.com/yuanyuanxiang/SimpleRemoter/server/go/protocol" ) +// ErrDeviceOffline is returned by SendToDevice when the target device is not +// (no longer) registered. +var ErrDeviceOffline = errors.New("device offline") + +// ErrNoSender is returned by SendToDevice if SetSender has not been called. +var ErrNoSender = errors.New("hub sender not configured") + +// SendFunc encodes-and-writes raw command bytes to a device's TCP context. +// In practice this is bound to server.Server.Send at startup. +type SendFunc func(ctx *connection.Context, data []byte) error + // Device is the internal record for one logical end-device (keyed by MasterID). // A single device may use multiple TCP sub-connections (screen, terminal …); // only the main login connection is stored here. @@ -43,9 +54,65 @@ type Device struct { RTT int // network latency in ms (Heartbeat.Ping) ActiveWindow string // foreground window title (Heartbeat.ActiveWnd, decoded) - // conn is the main connection's context. Web side will use it in Phase 4 - // to push COMMAND_SCREEN_SPY and similar commands via the hub. + // conn is the main connection's context — used by SendToDevice to forward + // commands (COMMAND_SCREEN_SPY, etc.) to the device. conn *connection.Context + + // screenConn is the device-initiated sub-connection that streams + // TOKEN_BITMAPINFO / TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames. Bound + // after the device responds to COMMAND_SCREEN_SPY. Nil while no screen + // session is active. + screenConn *connection.Context + + // Cached screen state, for late-joining browsers. Populated by the + // PublishResolution / PublishScreenFrame call sites. screenWidth==0 or + // lastKeyframe==nil indicates "no session" / "no IDR seen yet". + screenWidth int + screenHeight int + lastKeyframe []byte // fully-packed WS binary packet of the most recent IDR + + // Cursor index dedup: cursors arrive on every frame (~30 Hz) but only + // rarely change. Suppress duplicates so the WS doesn't carry redundant + // JSON messages. + cursorSeen bool + lastCursorIndex byte +} + +// ScreenCache is a read-only snapshot of a device's last-seen screen state, +// used by wsHub.handleConnect to bootstrap late joiners. +type ScreenCache struct { + Width int + Height int + Keyframe []byte // packed WS packet; nil if no keyframe cached yet + Active bool // true iff a screen sub-conn is currently bound +} + +// ScreenState returns a snapshot of the device's current screen state, or +// an empty struct if the device is unknown. Safe to call from any goroutine. +func (h *Hub) ScreenState(deviceID string) ScreenCache { + h.mu.RLock() + defer h.mu.RUnlock() + d, ok := h.devices[deviceID] + if !ok { + return ScreenCache{} + } + return ScreenCache{ + Width: d.screenWidth, + Height: d.screenHeight, + Keyframe: d.lastKeyframe, + Active: d.screenConn != nil, + } +} + +// MainConn exposes the device's main TCP context for callers that need to +// send commands directly. Returns nil if the device is not registered. +func (h *Hub) MainConn(id string) *connection.Context { + h.mu.RLock() + defer h.mu.RUnlock() + if d, ok := h.devices[id]; ok { + return d.conn + } + return nil } // DeviceInfo is the JSON-safe projection of Device for the /api/devices @@ -70,27 +137,128 @@ type DeviceInfo struct { Online bool `json:"online"` } -// EventHandler receives notifications about device lifecycle and per-tick -// live updates. Methods are invoked synchronously from Register / Unregister / -// UpdateLive — implementations must be non-blocking (typically just write to -// a channel or queue). +// EventHandler receives notifications about device lifecycle, per-tick live +// updates, screen frames and resolution changes. Methods are invoked +// synchronously from the corresponding hub mutator — implementations must +// be non-blocking (typically just write to a channel or queue). type EventHandler interface { OnDeviceOnline(d DeviceInfo) OnDeviceOffline(id string) OnDeviceUpdate(id string, rtt int, activeWindow string) + // OnScreenFrame delivers a fully-formed WS binary packet for the given + // device. The packet matches the C++ layout: + // [DeviceID:4 LE][FrameType:1][DataLen:4 LE][H264:N] + // Implementations should treat the slice as read-only. + OnScreenFrame(deviceID string, packet []byte, isKeyframe bool) + // OnResolutionChange fires when a screen session starts (TOKEN_BITMAPINFO) + // or whenever the device reports a new screen geometry mid-stream. + OnResolutionChange(deviceID string, width, height int) + // OnCursorChange fires when the device's foreground cursor index changes. + // Duplicates (same index as the previous frame) are filtered out by the + // hub before reaching subscribers. + OnCursorChange(deviceID string, index byte) } // Hub is a thread-safe registry of online devices. type Hub struct { - mu sync.RWMutex - devices map[string]*Device - subMu sync.RWMutex + mu sync.RWMutex + devices map[string]*Device + subMu sync.RWMutex subscribers []EventHandler + + sender SendFunc + + // Reverse index: TCP context -> device ID for the device's screen + // sub-connection. Lets us clean up on raw-connection close without + // having to walk every device. Empty when no screen sessions exist. + screenIndex map[*connection.Context]string + screenIndexMu sync.RWMutex } // New returns an empty Hub. func New() *Hub { - return &Hub{devices: make(map[string]*Device)} + return &Hub{ + devices: make(map[string]*Device), + screenIndex: make(map[*connection.Context]string), + } +} + +// SetSender wires the function used to deliver outbound bytes on a device's +// main TCP connection. Typically called once in main() with server.Send. +func (h *Hub) SetSender(fn SendFunc) { + h.sender = fn +} + +// SendToDevice forwards an already-formed command payload to the device's +// main connection. data should be the raw command bytes (the sender takes +// care of framing / compression at the protocol layer). +func (h *Hub) SendToDevice(id string, data []byte) error { + h.mu.RLock() + d, ok := h.devices[id] + h.mu.RUnlock() + if !ok || d.conn == nil { + return ErrDeviceOffline + } + if h.sender == nil { + return ErrNoSender + } + return h.sender(d.conn, data) +} + +// BindScreenConn associates a freshly-arrived sub-connection (the one that +// just sent TOKEN_BITMAPINFO) with the device identified by clientID. +// Returns false if the device is not registered — callers should drop the +// orphan connection in that case. +func (h *Hub) BindScreenConn(deviceID string, ctx *connection.Context) bool { + if deviceID == "" || ctx == nil { + return false + } + h.mu.Lock() + d, ok := h.devices[deviceID] + if !ok { + h.mu.Unlock() + return false + } + d.screenConn = ctx + h.mu.Unlock() + + h.screenIndexMu.Lock() + h.screenIndex[ctx] = deviceID + h.screenIndexMu.Unlock() + return true +} + +// ScreenDeviceID returns the device ID whose screen sub-connection this +// context represents, or "" if the context is not a screen sub-connection. +// Used by the TCP layer to route TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames. +func (h *Hub) ScreenDeviceID(ctx *connection.Context) string { + h.screenIndexMu.RLock() + defer h.screenIndexMu.RUnlock() + return h.screenIndex[ctx] +} + +// UnbindScreenConn removes the screen sub-connection mapping (called on TCP +// disconnect of a screen sub-context). No-op if the context isn't tracked. +func (h *Hub) UnbindScreenConn(ctx *connection.Context) { + h.screenIndexMu.Lock() + deviceID, ok := h.screenIndex[ctx] + if !ok { + h.screenIndexMu.Unlock() + return + } + delete(h.screenIndex, ctx) + h.screenIndexMu.Unlock() + + h.mu.Lock() + if d, ok := h.devices[deviceID]; ok && d.screenConn == ctx { + d.screenConn = nil + // Clear the cache too — when this device's screen comes back up, the + // resolution and IDR will be republished fresh. + d.screenWidth = 0 + d.screenHeight = 0 + d.lastKeyframe = nil + } + h.mu.Unlock() } // Subscribe registers an EventHandler. The returned func removes it. @@ -119,14 +287,16 @@ func (h *Hub) snapshotSubscribers() []EventHandler { return out } -// Register records a device as online. Re-registering an existing ID overwrites -// the previous entry (e.g. a client reconnect with the same MasterID). -// A nil device or empty ID is silently ignored. +// Register records a device as online and pins the main TCP connection that +// will receive outbound commands via SendToDevice. Re-registering an existing +// ID overwrites the previous entry (e.g. a client reconnect with the same +// MasterID). A nil device, nil conn, or empty ID is silently ignored. // Subscribers are notified after the device is added. -func (h *Hub) Register(d *Device) { - if d == nil || d.ID == "" { +func (h *Hub) Register(d *Device, conn *connection.Context) { + if d == nil || d.ID == "" || conn == nil { return } + d.conn = conn h.mu.Lock() h.devices[d.ID] = d info := deviceToInfo(d) @@ -187,6 +357,135 @@ func deviceToInfo(d *Device) DeviceInfo { } } +// PublishCursor notifies subscribers when the device reports a new cursor +// index. Repeated identical indices are suppressed so the WS isn't spammed +// with per-frame cursor JSON. No-op for unknown devices. +func (h *Hub) PublishCursor(deviceID string, index byte) { + h.mu.Lock() + d, ok := h.devices[deviceID] + if !ok { + h.mu.Unlock() + return + } + if d.cursorSeen && d.lastCursorIndex == index { + h.mu.Unlock() + return + } + d.cursorSeen = true + d.lastCursorIndex = index + h.mu.Unlock() + for _, s := range h.snapshotSubscribers() { + s.OnCursorChange(deviceID, index) + } +} + +// CloseScreen tears down the active screen sub-connection for the device, +// if any. Used when the last viewer leaves so the device stops capturing. +// +// Cache (screenConn / screenWidth / lastKeyframe) is cleared SYNCHRONOUSLY +// here, not deferred to the eventual OnDisconnect → UnbindScreenConn path. +// Otherwise a new viewer arriving in the brief window between TCP close and +// the disconnect callback would see Active=true with stale dimensions/IDR +// and skip the COMMAND_SCREEN_SPY kick, leaving the page stuck on a "connected" +// status with no frames ever arriving. +func (h *Hub) CloseScreen(deviceID string) { + h.mu.Lock() + d, ok := h.devices[deviceID] + if !ok { + h.mu.Unlock() + return + } + sc := d.screenConn + d.screenConn = nil + d.screenWidth = 0 + d.screenHeight = 0 + d.lastKeyframe = nil + h.mu.Unlock() + if sc != nil { + // Drop the screenIndex entry SYNCHRONOUSLY so any in-flight frames + // still draining out of the device on this sub-conn (between our + // FIN and the device's clean-up) are silently dropped instead of + // being relayed to the freshly initialized browser decoder. Mixing + // frames from the old x264 SPS/PPS sequence with the new session's + // decoder produces the classic "every other quick reconnect goes + // black" symptom — old NAL units come in via the old ctx after we + // nulled d.screenConn but before OnDisconnect fires. + h.screenIndexMu.Lock() + delete(h.screenIndex, sc) + h.screenIndexMu.Unlock() + + // Tell the client to shut its screen pipeline down gracefully. + // Without this, the client's IOCPClient sees recv()==0 as a network + // blip and fires m_ReconnectFunc, which: + // 1. Reconnects the sub-conn (~100 ms) + // 2. Re-sends ConnAuthPacket (no BITMAPINFO!) + // 3. Keeps the capture thread alive for ~10 s holding DXGI handles + // 4. ConnAuth eventually times out, ScreenManager exits + // Net effect: a second viewer arriving within ~10 s of leaving lands + // in the dead window where the device is still capturing for the old + // (now unrouted) sub-conn — page sits on "Waiting for video". + // + // COMMAND_BYE is what the C++ server sends via + // CDialogBase::SayByeBye (server/2015Remote/IOCPServer.h:248) before + // it tears down a sub-conn for the same reason. Client-side handler: + // CScreenManager::OnReceive case COMMAND_BYE + // (client/ScreenManager.cpp:812) sets m_bIsWorking=FALSE and calls + // StopRunning() — the clean exit path that does NOT trigger reconnect. + if h.sender != nil { + _ = h.sender(sc, []byte{protocol.CommandBye}) + } + // Mirror the C++ flow (ScreenSpyDlg.cpp:842 — Sleep(500); CancelIO()). + // Give the device's read loop a moment to pull COMMAND_BYE off the + // wire before our FIN arrives; otherwise on a fast LAN the BYE byte + // can be coalesced with the FIN and the client's IOCPClient may + // observe recv()==0 first and trigger reconnect anyway. + // Run the close on a goroutine so the caller (web handler) isn't + // blocked for 500 ms. screenIndex is already cleared above, so + // in-flight frames during the grace window are silently dropped. + go func(c *connection.Context) { + time.Sleep(500 * time.Millisecond) + c.Close() + }(sc) + } +} + +// PublishResolution announces a new (or first-ever) screen geometry for a +// device. The browser uses width/height to initialize its WebCodecs decoder. +// The latest dimensions are also cached on the Device so future late-joining +// viewers can be bootstrapped without waiting for the next BITMAPINFO. +func (h *Hub) PublishResolution(deviceID string, width, height int) { + h.mu.Lock() + if d, ok := h.devices[deviceID]; ok { + d.screenWidth = width + d.screenHeight = height + } + h.mu.Unlock() + for _, s := range h.snapshotSubscribers() { + s.OnResolutionChange(deviceID, width, height) + } +} + +// PublishScreenFrame fans out a screen frame packet to all subscribers. +// Callers must have already wrapped the H.264 NAL payload in the +// [DeviceID:4][FrameType:1][DataLen:4][...] header expected by the browser. +// The packet slice is shared with subscribers — do not mutate after publish. +// +// Keyframe packets are also retained on the Device record so a new viewer +// joining a live session can immediately receive a decodable starting point +// instead of waiting up to ~15 s for the next IDR. +func (h *Hub) PublishScreenFrame(deviceID string, packet []byte, isKeyframe bool) { + if isKeyframe { + h.mu.Lock() + if d, ok := h.devices[deviceID]; ok { + d.lastKeyframe = packet + } + h.mu.Unlock() + } + for _, s := range h.snapshotSubscribers() { + s.OnScreenFrame(deviceID, packet, isKeyframe) + } +} + // UpdateLive applies a heartbeat-derived RTT and active-window title to the // device's live fields, then notifies subscribers. No-op if the device is // not registered (e.g. heartbeat arriving for a connection that never sent diff --git a/server/go/hub/hub_test.go b/server/go/hub/hub_test.go index 930cc4f..cbd8185 100644 --- a/server/go/hub/hub_test.go +++ b/server/go/hub/hub_test.go @@ -5,16 +5,22 @@ import ( "sync" "testing" "time" + + "github.com/yuanyuanxiang/SimpleRemoter/server/go/connection" ) +// stubCtx returns a non-nil *connection.Context useful only as a sentinel. +// Tests never invoke Send / Close on it. +func stubCtx() *connection.Context { return &connection.Context{} } + func TestHubRegisterListUnregister(t *testing.T) { h := New() if got := h.Count(); got != 0 { t.Fatalf("empty hub: want Count=0, got %d", got) } - h.Register(&Device{ID: "a", Name: "Alice", ConnectedAt: time.Now()}) - h.Register(&Device{ID: "b", Name: "Bob", ConnectedAt: time.Now()}) + h.Register(&Device{ID: "a", Name: "Alice", ConnectedAt: time.Now()}, stubCtx()) + h.Register(&Device{ID: "b", Name: "Bob", ConnectedAt: time.Now()}, stubCtx()) if got := h.Count(); got != 2 { t.Fatalf("after 2 registers: want Count=2, got %d", got) } @@ -38,8 +44,9 @@ func TestHubRegisterListUnregister(t *testing.T) { func TestHubNilAndEmptyIgnored(t *testing.T) { h := New() - h.Register(nil) - h.Register(&Device{ID: ""}) + h.Register(nil, stubCtx()) + h.Register(&Device{ID: ""}, stubCtx()) + h.Register(&Device{ID: "valid"}, nil) // nil conn should also be rejected h.Unregister("") if got := h.Count(); got != 0 { t.Fatalf("nil/empty register should be no-op, got Count=%d", got) @@ -71,13 +78,19 @@ func (c *captureHandler) OnDeviceUpdate(id string, rtt int, _ string) { c.mu.Unlock() } +func (c *captureHandler) OnScreenFrame(_ string, _ []byte, _ bool) {} + +func (c *captureHandler) OnResolutionChange(_ string, _, _ int) {} + +func (c *captureHandler) OnCursorChange(_ string, _ byte) {} + func TestHubSubscribeEvents(t *testing.T) { h := New() c := &captureHandler{} unsub := h.Subscribe(c) - h.Register(&Device{ID: "x", Name: "x"}) - h.Register(&Device{ID: "y", Name: "y"}) + h.Register(&Device{ID: "x", Name: "x"}, stubCtx()) + h.Register(&Device{ID: "y", Name: "y"}, stubCtx()) h.Unregister("x") h.Unregister("nonexistent") // no event @@ -89,7 +102,7 @@ func TestHubSubscribeEvents(t *testing.T) { } unsub() - h.Register(&Device{ID: "z"}) + h.Register(&Device{ID: "z"}, stubCtx()) if len(c.online) != 2 { t.Fatalf("after unsubscribe should not receive events: %+v", c.online) } @@ -100,7 +113,7 @@ func TestHubUpdateLive(t *testing.T) { c := &captureHandler{} h.Subscribe(c) - h.Register(&Device{ID: "x", Name: "x"}) + h.Register(&Device{ID: "x", Name: "x"}, stubCtx()) h.UpdateLive("x", 42, "Notepad") h.UpdateLive("ghost", 999, "should be ignored") // unknown id, no event @@ -116,8 +129,8 @@ func TestHubUpdateLive(t *testing.T) { func TestHubRegisterOverwrites(t *testing.T) { h := New() - h.Register(&Device{ID: "x", Name: "first"}) - h.Register(&Device{ID: "x", Name: "second"}) + h.Register(&Device{ID: "x", Name: "first"}, stubCtx()) + h.Register(&Device{ID: "x", Name: "second"}, stubCtx()) list := h.ListDevices() if len(list) != 1 || list[0].Name != "second" { t.Fatalf("re-register should overwrite, got %+v", list) @@ -137,7 +150,7 @@ func TestHubConcurrent(t *testing.T) { defer wg.Done() for i := range opsPer { id := fmt.Sprintf("g%d-%d", g, i) - h.Register(&Device{ID: id, Name: id, ConnectedAt: time.Now()}) + h.Register(&Device{ID: id, Name: id, ConnectedAt: time.Now()}, stubCtx()) _ = h.ListDevices() _ = h.Count() h.Unregister(id) diff --git a/server/go/protocol/commands.go b/server/go/protocol/commands.go index b5ea1c8..ca3f46a 100644 --- a/server/go/protocol/commands.go +++ b/server/go/protocol/commands.go @@ -2,7 +2,10 @@ package protocol import ( "bytes" + "crypto/hmac" + "crypto/sha256" "encoding/binary" + "encoding/hex" "strings" "golang.org/x/text/encoding/simplifiedchinese" @@ -44,19 +47,126 @@ func cleanString(s string) string { return strings.TrimSpace(result.String()) } -// Command tokens - matching the C++ definitions +// Command tokens - matching the C++ definitions (common/commands.h). const ( // Server -> Client commands - CommandActived byte = 0 // COMMAND_ACTIVED - CommandBye byte = 204 // COMMAND_BYE - disconnect - CommandHeartbeat byte = 216 // CMD_HEARTBEAT_ACK + CommandActived byte = 0 // COMMAND_ACTIVED + CommandScreenSpy byte = 16 // COMMAND_SCREEN_SPY - start screen capture + CommandNext byte = 30 // COMMAND_NEXT - "control-side dialog is open, you may stream" + CommandBye byte = 204 // COMMAND_BYE - disconnect + CommandHeartbeat byte = 216 // CMD_HEARTBEAT_ACK // Client -> Server tokens - TokenAuth byte = 100 // TOKEN_AUTH - authorization required - TokenHeartbeat byte = 101 // TOKEN_HEARTBEAT - TokenLogin byte = 102 // TOKEN_LOGIN - login packet + TokenAuth byte = 100 // TOKEN_AUTH - authorization required + TokenHeartbeat byte = 101 // TOKEN_HEARTBEAT + TokenLogin byte = 102 // TOKEN_LOGIN - login packet + TokenBitmapInfo byte = 115 // TOKEN_BITMAPINFO - screen sub-connection header + TokenFirstScreen byte = 116 // TOKEN_FIRSTSCREEN - raw BGRA baseline frame (NOT H264) + TokenNextScreen byte = 117 // TOKEN_NEXTSCREEN - non-keyframe H264 (P-frame) + TokenKeyframe byte = 134 // TOKEN_KEYFRAME - H264 IDR (sent on GOP boundary) + TokenConnAuth byte = 246 // TOKEN_CONN_AUTH - sub-connection identity handshake + CmdCursorImage byte = 93 // CMD_CURSOR_IMAGE - custom cursor bitmap (Phase 5+ feature) ) +// Sub-connection authentication (matches common/commands.h ConnAuth* structs). +// Each newly-opened sub-conn first sends a 512-byte ConnAuthPacket, then waits +// for a 256-byte ConnAuthAck before any further command is meaningful. +const ( + ConnAuthPacketSize = 512 + ConnAuthAckSize = 256 + // ConnAuthAck field offsets within the 256-byte buffer. + ConnAuthAckOffStatus = 1 // uint8 + ConnAuthAckOffServerTime = 2 // uint64 LE + // Status codes. + ConnAuthStatusOK byte = 0 +) + +// CMD_MASTERSETTING is the server's reply to a fresh client login. The +// client uses the Signature field to prove this server has the shared +// secret; without a valid signature the client's private FileUpload init +// aborts the process. Struct layout matches MasterSettings in +// common/commands.h (pragma pack 4, total 1000 bytes). +const ( + CmdMasterSetting byte = 215 + MasterSettingsSize = 1000 + MasterSettingsOffReportInterval = 0 // int32, seconds + MasterSettingsOffSignature = 508 // Signature[64] + MasterSettingsSignatureLen = 64 + // DefaultReportIntervalSec matches the C++ default. Sending 0 makes the + // client disable its active-window heartbeat field, breaking RTT / + // ActiveWindow live updates on the web UI. + DefaultReportIntervalSec = 5 +) + +// SignMessage computes HMAC-SHA256(key, msg) and returns the 64-char +// lowercase hex digest. Used to sign CMD_MASTERSETTING replies so the +// client can verify the response came from a legitimate server. +// +// The key is a deployment-time shared secret loaded from the +// YAMA_SIGN_PASSWORD env var so the binary doesn't carry the literal in +// cleartext; provision out-of-band and never commit it. +func SignMessage(password string, msg []byte) string { + mac := hmac.New(sha256.New, []byte(password)) + mac.Write(msg) + return hex.EncodeToString(mac.Sum(nil)) +} + +// Screen-spy parameters that match the C++ ScreenSpy implementation. +const ( + AlgorithmH264 byte = 2 // ALGORITHM_H264 — H264 encoding (the algorithm web uses) +) + +// Reserved-field indices we care about (see common/commands.h RES_* enum). +// LOGIN_INFOR.szReserved is a '|'-separated list; clients fill known slots +// even when leaving others blank ("?"). +const ( + ResFieldClientType = 0 // RES_CLIENT_TYPE — client kind (Windows / macOS / ...) + ResFieldFilePath = 4 // RES_FILE_PATH — install path + ResFieldInstallTime = 6 // RES_INSTALL_TIME + ResFieldClientLoc = 10 // RES_CLIENT_LOC — geo string + ResFieldClientPubIP = 11 // RES_CLIENT_PUBIP — public IP + ResFieldClientID = 16 // RES_CLIENT_ID — uint64 decimal, matches TOKEN_BITMAPINFO clientID +) + +// ScreenFrameHeaderLen is the size of the small per-frame header prepended by +// the device on every TOKEN_NEXTSCREEN buffer, before the H.264 NAL payload. +// Layout (excluding the leading TOKEN_* byte): +// +// [algorithm:1][cursorPos:8 (int32 x, int32 y)][cursorIdx:1] = 10 bytes +// +// (The C++ side counts the token byte into its ulHeadLength=11; we keep the +// constant strictly post-token so the call site reads `skip := 1 + headerLen` +// without confusion.) SCREENYSPY_IMPROVE adds a 4-byte frameID after the +// cursor index, which is the production-off setting per common/commands.h. +const ScreenFrameHeaderLen = 1 + 8 + 1 + +// IsH264Keyframe scans an Annex-B H.264 bitstream for a NAL unit indicating +// a keyframe boundary — IDR (type 5), SPS (7) or PPS (8). Returns true on +// the first hit. Matches the detection used by the C++ ScreenSpy broadcast +// path so frame-type bytes stay consistent across server implementations. +func IsH264Keyframe(data []byte) bool { + n := len(data) + for i := 0; i+4 < n; i++ { + var nalOffset int + switch { + case data[i] == 0 && data[i+1] == 0 && data[i+2] == 0 && data[i+3] == 1: + nalOffset = i + 4 + case data[i] == 0 && data[i+1] == 0 && data[i+2] == 1: + nalOffset = i + 3 + default: + continue + } + if nalOffset >= n { + continue + } + nalType := data[nalOffset] & 0x1F + if nalType == 5 || nalType == 7 || nalType == 8 { + return true + } + } + return false +} + // LOGIN_INFOR structure size and offsets (matching C++ struct with default alignment) // Note: C++ struct uses default alignment (4-byte for uint32/int) const ( diff --git a/server/go/protocol/commands_test.go b/server/go/protocol/commands_test.go new file mode 100644 index 0000000..1fd9e69 --- /dev/null +++ b/server/go/protocol/commands_test.go @@ -0,0 +1,47 @@ +package protocol + +import "testing" + +func TestSignMessageHMACVector(t *testing.T) { + // Standard HMAC-SHA256 sanity vector. Anchors that SignMessage matches + // the canonical RFC 4231 algorithm so signatures stay interoperable + // with peers that compute the same digest. + got := SignMessage("key", []byte("hello")) + want := "9307b3b915efb5171ff14d8cb55fbcc798c6c0ef1456d66ded1a6aa723a58b7b" + if got != want { + t.Fatalf("SignMessage(key, hello) = %s, want %s", got, want) + } +} + +func TestSignMessageDeterministic(t *testing.T) { + a := SignMessage("test-key", []byte("2026-01-01 12:00:00|123456789")) + b := SignMessage("test-key", []byte("2026-01-01 12:00:00|123456789")) + if a != b { + t.Fatalf("non-deterministic: %s != %s", a, b) + } + if len(a) != 64 { + t.Fatalf("expected 64 hex chars, got %d (%s)", len(a), a) + } +} + +func TestIsH264KeyframeBasic(t *testing.T) { + // 4-byte start code + IDR (NAL type 5) + idr := []byte{0x00, 0x00, 0x00, 0x01, 0x65, 0x88} + if !IsH264Keyframe(idr) { + t.Fatal("IDR should be detected as keyframe") + } + // 3-byte start code + SPS (NAL type 7) + sps := []byte{0x00, 0x00, 0x01, 0x67, 0x42} + if !IsH264Keyframe(sps) { + t.Fatal("SPS should be detected as keyframe") + } + // 4-byte start code + non-IDR slice (NAL type 1) + pframe := []byte{0x00, 0x00, 0x00, 0x01, 0x41, 0x9b} + if IsH264Keyframe(pframe) { + t.Fatal("non-IDR slice should not be detected as keyframe") + } + // Garbage + if IsH264Keyframe([]byte{0xde, 0xad, 0xbe, 0xef}) { + t.Fatal("non-H264 bytes should not match") + } +} diff --git a/server/go/protocol/parser.go b/server/go/protocol/parser.go index 7a4868b..9daf901 100644 --- a/server/go/protocol/parser.go +++ b/server/go/protocol/parser.go @@ -20,6 +20,19 @@ type Parser struct { codec *Codec } +// findHTTPBodyOffset returns the byte offset of the HTTP body — i.e. one past +// the first `\r\n\r\n` separator. Returns -1 if the separator isn't present +// yet (caller should wait for more data). Matches the C++ UnMaskHttp scan in +// common/mask.h. +func findHTTPBodyOffset(data []byte) int { + for i := 0; i+4 <= len(data); i++ { + if data[i] == '\r' && data[i+1] == '\n' && data[i+2] == '\r' && data[i+3] == '\n' { + return i + 4 + } + } + return -1 +} + // NewParser creates a new parser func NewParser() *Parser { return &Parser{ @@ -38,6 +51,22 @@ func (p *Parser) Close() { func (p *Parser) Parse(ctx *connection.Context) ([]byte, error) { buf := ctx.InBuffer + // Strip optional HTTP-mask wrapper. The client may disguise each outbound + // chunk as a `POST / HTTP/1.1\r\n...\r\n\r\n` envelope followed + // by the real binary body (see common/mask.h: HttpMask). Each chunk + // carries its own envelope so we strip every time we see the prefix. + if buf.Len() >= 5 { + head := buf.Peek(5) + if len(head) == 5 && head[0] == 'P' && head[1] == 'O' && head[2] == 'S' && head[3] == 'T' && head[4] == ' ' { + bodyOffset := findHTTPBodyOffset(buf.Bytes()) + if bodyOffset < 0 { + // Headers not fully arrived yet — wait for more bytes. + return nil, ErrNeedMore + } + buf.Skip(bodyOffset) + } + } + // Need at least minimum bytes to determine protocol if buf.Len() < MinComLen { return nil, ErrNeedMore diff --git a/server/go/web/ws.go b/server/go/web/ws.go index 9af98c2..13d07cb 100644 --- a/server/go/web/ws.go +++ b/server/go/web/ws.go @@ -31,27 +31,46 @@ var upgrader = websocket.Upgrader{ // ----- per-connection client state ---------------------------------------- +// wsMsg is one queued WebSocket frame. binary toggles between +// websocket.TextMessage (JSON signaling) and websocket.BinaryMessage +// (screen frames). +type wsMsg struct { + binary bool + data []byte +} + type wsClient struct { conn *websocket.Conn - send chan []byte + send chan wsMsg closed chan struct{} once sync.Once // Mutated under wsHub.mu (or only by the read loop owning this client). - nonce string // outstanding challenge — cleared after a successful login - token string // set once authenticated - role string // mirrors session role after login - addr string // client address for logs + nonce string // outstanding challenge — cleared after a successful login + token string // set once authenticated + role string // mirrors session role after login + addr string // client address for logs + watching string // device ID this browser is currently streaming, "" when on the list } -// queue writes a payload onto the send buffer. Drops silently if the buffer -// is full so a stuck reader can't back-pressure the broadcast path. +// queue writes a JSON text frame onto the send buffer. Drops silently if the +// buffer is full so a stuck reader can't back-pressure the broadcast path. func (c *wsClient) queue(payload []byte) { + c.enqueue(wsMsg{binary: false, data: payload}) +} + +// queueBinary writes a binary WS frame. Used for screen-stream packets. +func (c *wsClient) queueBinary(payload []byte) { + c.enqueue(wsMsg{binary: true, data: payload}) +} + +func (c *wsClient) enqueue(m wsMsg) { select { - case c.send <- payload: + case c.send <- m: case <-c.closed: default: - // queue full — caller is responsible for noticing if it matters. + // queue full — drop (acceptable for video; signaling clients are + // typically not behind enough for the small text buffer to fill). } } @@ -105,6 +124,58 @@ func (h *wsHub) OnDeviceOffline(_ string) { h.broadcastAuthenticated(`{"cmd":"devices_changed"}`) } +// OnCursorChange relays the remote cursor index to every viewer of this +// device. The browser maps the index to a CSS cursor (desktop) or overlay +// SVG variant (touch). Hub already de-duplicates so we always have a real +// transition here. +func (h *wsHub) OnCursorChange(deviceID string, index byte) { + msg := mustJSON(map[string]any{ + "cmd": "cursor", + "index": index, + }) + h.mu.RLock() + defer h.mu.RUnlock() + for c := range h.clients { + if c.watching == deviceID && c.token != "" { + c.queue(msg) + } + } +} + +// OnResolutionChange notifies viewers so the browser-side WebCodecs decoder +// can be (re)initialized with the right frame size. Without this, incoming +// binary frames after connect_result are decoded by an uninitialized +// VideoDecoder and the page stays on "Waiting for video...". +func (h *wsHub) OnResolutionChange(deviceID string, width, height int) { + msg := mustJSON(map[string]any{ + "cmd": "resolution_changed", + "id": deviceID, + "width": width, + "height": height, + }) + h.mu.RLock() + defer h.mu.RUnlock() + for c := range h.clients { + if c.watching == deviceID && c.token != "" { + c.queue(msg) + } + } +} + +// OnScreenFrame ships a screen packet to every browser currently watching +// this device. We hold the read lock for the whole iteration, but each +// queueBinary is non-blocking (drops on backpressure) so a slow viewer +// cannot stall the fast ones. +func (h *wsHub) OnScreenFrame(deviceID string, packet []byte, _ bool) { + h.mu.RLock() + defer h.mu.RUnlock() + for c := range h.clients { + if c.watching == deviceID && c.token != "" { + c.queueBinary(packet) + } + } +} + // OnDeviceUpdate forwards heartbeat-derived liveness data so the device-list // rows can refresh RTT and active-window labels without re-fetching. func (h *wsHub) OnDeviceUpdate(id string, rtt int, activeWindow string) { @@ -144,6 +215,12 @@ func (h *wsHub) unregister(c *wsClient) { h.mu.Lock() delete(h.clients, c) h.mu.Unlock() + // If this client was the last viewer of a device, tear down the screen + // session so the device stops encoding. Done OUTSIDE the lock so the + // hub's mutators can take their own locks without risk of recursion. + if c.watching != "" && h.countWatchers(c.watching) == 0 { + h.devices.CloseScreen(c.watching) + } // Do NOT revoke the token: tokens are session-scoped, not WS-scoped. // Frontend may close+reopen the WS at any time (visibilitychange handler, // brief network blip, reload) and must be able to resume with the same @@ -170,7 +247,7 @@ func (h *wsHub) serve(w http.ResponseWriter, r *http.Request) { client := &wsClient{ conn: conn, - send: make(chan []byte, wsSendBuffer), + send: make(chan wsMsg, wsSendBuffer), closed: make(chan struct{}), nonce: nonce, addr: r.RemoteAddr, @@ -192,8 +269,12 @@ func (h *wsHub) writeLoop(c *wsClient) { for { select { case msg := <-c.send: + msgType := websocket.TextMessage + if msg.binary { + msgType = websocket.BinaryMessage + } _ = c.conn.SetWriteDeadline(time.Now().Add(wsWriteWait)) - if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + if err := c.conn.WriteMessage(msgType, msg.data); err != nil { c.close() return } diff --git a/server/go/web/ws_handlers.go b/server/go/web/ws_handlers.go index 2921f43..45e0cdf 100644 --- a/server/go/web/ws_handlers.go +++ b/server/go/web/ws_handlers.go @@ -2,6 +2,9 @@ package web import ( "encoding/json" + + "github.com/yuanyuanxiang/SimpleRemoter/server/go/hub" + "github.com/yuanyuanxiang/SimpleRemoter/server/go/protocol" ) // dispatch routes one inbound message to its handler. The `raw` payload is @@ -21,12 +24,10 @@ func (h *wsHub) dispatch(c *wsClient, cmd string, raw []byte) { case "ping": // no-op heartbeat; the read itself was the keep-alive signal case "disconnect": - c.queue([]byte(`{"cmd":"disconnect_result","ok":true}`)) + h.handleDisconnect(c, raw) - // Reserved for later phases. Reply with a benign failure so the UI can - // surface a clear error instead of spinning indefinitely. case "connect": - h.replyNotImplemented(c, "connect_result", "Screen sharing not yet implemented on Go server") + h.handleConnect(c, raw) case "rdp_reset": // silently ignored — UI uses this as a fire-and-forget case "mouse", "key": @@ -116,6 +117,125 @@ func (h *wsHub) handleLogin(c *wsClient, raw []byte) { })) } +// handleConnect kicks off a screen-sharing session for the browser. We send +// COMMAND_SCREEN_SPY to the device's main TCP connection; the device then +// opens a new sub-connection (TOKEN_BITMAPINFO) which the TCP side binds to +// the device via hub.BindScreenConn. Frame relay to the browser is handled +// in Phase 4.2 once frames actually arrive. +// +// Reply semantics: returning connect_result.ok=true (without width/height) +// triggers the browser's "Waiting for video..." spinner. We can't deliver +// width/height here because we don't yet know them — they show up in the +// first TOKEN_BITMAPINFO from the device. +// handleDisconnect detaches this client from any device it was watching and +// — if no other authenticated client is still watching — closes the device's +// screen sub-connection. Closing the TCP sub-conn is the signal the C++ +// device firmware uses to stop screen capture, so this is how we ask the +// device to free its encoder. +func (h *wsHub) handleDisconnect(c *wsClient, _ []byte) { + // Mirror handleConnect: take h.mu so event-handler readers + // (OnResolutionChange/OnScreenFrame) get a consistent view of c.watching. + h.mu.Lock() + prev := c.watching + c.watching = "" + h.mu.Unlock() + c.queue([]byte(`{"cmd":"disconnect_result","ok":true}`)) + if prev != "" && h.countWatchers(prev) == 0 { + h.devices.CloseScreen(prev) + } +} + +// countWatchers returns how many authenticated clients still have their +// `watching` field pointing at deviceID. Called from disconnect paths. +func (h *wsHub) countWatchers(deviceID string) int { + h.mu.RLock() + defer h.mu.RUnlock() + n := 0 + for c := range h.clients { + if c.watching == deviceID { + n++ + } + } + return n +} + +func (h *wsHub) handleConnect(c *wsClient, raw []byte) { + if !h.requireAuth(c, raw, "connect_result") { + return + } + var in struct { + ID string `json:"id"` + } + if err := json.Unmarshal(raw, &in); err != nil || in.ID == "" { + c.queue(mustJSON(map[string]any{"cmd": "connect_result", "ok": false, "msg": "Bad request"})) + return + } + + // If a screen session is already live for this device (another browser + // is already watching), reuse it: hand the new viewer the current + // resolution and the most recent IDR keyframe so its decoder can start + // rendering immediately, without waiting for the next IDR (~15 s). + cache := h.devices.ScreenState(in.ID) + if cache.Active { + c.queue(mustJSON(map[string]any{ + "cmd": "connect_result", "ok": true, + "width": cache.Width, "height": cache.Height, + })) + if len(cache.Keyframe) > 0 { + c.queueBinary(cache.Keyframe) + } + h.mu.Lock() + c.watching = in.ID + h.mu.Unlock() + return + } + + // No active session — kick the device to start capturing. We send the + // same 32-byte COMMAND_SCREEN_SPY payload the C++ WebService sends: + // [0]=COMMAND_SCREEN_SPY, [1]=0 (GDI), [2]=ALGORITHM_H264, [3]=1 (multi-screen), + // [4..31]=0. + cmd := make([]byte, 32) + cmd[0] = protocol.CommandScreenSpy + cmd[2] = protocol.AlgorithmH264 + cmd[3] = 1 + + // CRITICAL: bind c.watching BEFORE asking the device to start capturing. + // On fast reconnects the device's screen sub-conn handshake completes in + // <100 ms, so TOKEN_BITMAPINFO and even the first H264 frame can arrive + // before this handler finishes — and the resolution_changed / frame + // broadcasts in wsHub filter on c.watching. With the assignment after + // SendToDevice the new viewer silently misses the very first IDR and + // resolution_changed, leaving the page stuck on "Waiting for video". + // + // The write needs to share the lock event handlers use to read c.watching + // (they iterate h.clients under h.mu.RLock). Without that the write is a + // data race; on a fast reconnect the reader goroutine can keep observing + // the previous value ("") long enough to drop the first resolution_changed + // and the first IDR, which produces the exact "every other quick reconnect + // goes black" symptom — the C++ server avoids it because it does the same + // state mutation under std::mutex and reaps the memory-barrier as a bonus. + h.mu.Lock() + c.watching = in.ID + h.mu.Unlock() + + if err := h.devices.SendToDevice(in.ID, cmd); err != nil { + // Roll back the watching flag if we never managed to kick capture. + h.mu.Lock() + c.watching = "" + h.mu.Unlock() + msg := "Device offline" + if err != hub.ErrDeviceOffline { + msg = "Failed to start screen capture" + h.log.Error("SendToDevice(%s): %v", in.ID, err) + } + c.queue(mustJSON(map[string]any{"cmd": "connect_result", "ok": false, "msg": msg})) + return + } + h.log.Info("[timing] COMMAND_SCREEN_SPY sent to device=%s (cold start)", in.ID) + + c.queue(mustJSON(map[string]any{"cmd": "connect_result", "ok": true})) +} + func (h *wsHub) handleGetDevices(c *wsClient, raw []byte) { if !h.requireAuth(c, raw, "device_list") { return