843 lines
28 KiB
Go
843 lines
28 KiB
Go
// Package hub maintains the registry of currently online devices and acts as
|
|
// the bridge between the TCP server (which sees raw client connections) and
|
|
// the web server (which serves browser clients).
|
|
//
|
|
// The TCP side calls Register / Unregister / UpdateLive / BindScreenConn as
|
|
// the protocol layer notices new sub-connections.
|
|
// The web side calls ListDevices / SendToDevice / Subscribe.
|
|
// Neither side imports the other — both depend only on this package.
|
|
package hub
|
|
|
|
import (
|
|
"errors"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/yuanyuanxiang/SimpleRemoter/server/go/connection"
|
|
"github.com/yuanyuanxiang/SimpleRemoter/server/go/protocol"
|
|
)
|
|
|
|
// ErrDeviceOffline is returned by SendToDevice when the target device is not
|
|
// (no longer) registered.
|
|
var ErrDeviceOffline = errors.New("device offline")
|
|
|
|
// ErrNoSender is returned by SendToDevice if SetSender has not been called.
|
|
var ErrNoSender = errors.New("hub sender not configured")
|
|
|
|
// SendFunc encodes-and-writes raw command bytes to a device's TCP context.
|
|
// In practice this is bound to server.Server.Send at startup.
|
|
type SendFunc func(ctx *connection.Context, data []byte) error
|
|
|
|
// Device is the internal record for one logical end-device (keyed by MasterID).
|
|
// A single device may use multiple TCP sub-connections (screen, terminal …);
|
|
// only the main login connection is stored here.
|
|
//
|
|
// PCName from LOGIN_INFOR is interpreted as "ComputerName/Group" and
|
|
// ModuleVersion as "Version-Capability"; the split halves live in separate
|
|
// fields so the front-end can render them independently.
|
|
type Device struct {
|
|
ID string // MasterID — stable identifier the client reports at login
|
|
Name string // PCName before '/' (real computer name)
|
|
Group string // PCName after '/' (group label; may be empty)
|
|
Version string // ModuleVersion before '-' (semantic version)
|
|
Capability string // ModuleVersion after '-' (capability tags; may be empty)
|
|
OS string // OS version string
|
|
CPU string // from LOGIN_INFOR reserved field 2
|
|
FilePath string // from LOGIN_INFOR reserved field 4
|
|
InstallTime string // from LOGIN_INFOR reserved field 6 (or StartTime)
|
|
Location string // client-reported geo string (reserved field 10)
|
|
PeerIP string // network-level remote address as seen by the server
|
|
PublicIP string // client-reported public IP (reserved field 11)
|
|
Resolution string // client-formatted screen geometry "N:W*H" (reserved field 15)
|
|
ConnectedAt time.Time
|
|
|
|
// Live fields refreshed on every heartbeat. Protected by hub.mu.
|
|
RTT int // network latency in ms (Heartbeat.Ping)
|
|
ActiveWindow string // foreground window title (Heartbeat.ActiveWnd, decoded)
|
|
|
|
// conn is the main connection's context — used by SendToDevice to forward
|
|
// commands (COMMAND_SCREEN_SPY, etc.) to the device.
|
|
conn *connection.Context
|
|
|
|
// screenConn is the device-initiated sub-connection that streams
|
|
// TOKEN_BITMAPINFO / TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames. Bound
|
|
// after the device responds to COMMAND_SCREEN_SPY. Nil while no screen
|
|
// session is active.
|
|
screenConn *connection.Context
|
|
|
|
// Cached screen state, for late-joining browsers. Populated by the
|
|
// PublishResolution / PublishScreenFrame call sites. screenWidth==0 or
|
|
// lastKeyframe==nil indicates "no session" / "no IDR seen yet".
|
|
screenWidth int
|
|
screenHeight int
|
|
lastKeyframe []byte // fully-packed WS binary packet of the most recent IDR
|
|
|
|
// Cursor index dedup: cursors arrive on every frame (~30 Hz) but only
|
|
// rarely change. Suppress duplicates so the WS doesn't carry redundant
|
|
// JSON messages.
|
|
cursorSeen bool
|
|
lastCursorIndex byte
|
|
|
|
// Terminal session state — at most one web terminal per device (MVP
|
|
// constraint shared with the C++ server). All three fields are
|
|
// guarded by hub.mu.
|
|
//
|
|
// terminalPending: COMMAND_SHELL has been sent, waiting for the device's
|
|
// sub-conn to arrive and announce itself via TOKEN_TERMINAL_START /
|
|
// TOKEN_SHELL_START.
|
|
// terminalConn: the shell sub-conn ctx after binding. Nil before BIND
|
|
// and after teardown.
|
|
// terminalIsPTY: distinguishes Linux/macOS/ConPTY (true) from the legacy
|
|
// Windows cmd-pipe path. PTY mode supports resize; cmd-pipe ignores it.
|
|
terminalPending bool
|
|
terminalConn *connection.Context
|
|
terminalIsPTY bool
|
|
}
|
|
|
|
// ScreenCache is a read-only snapshot of a device's last-seen screen state,
|
|
// used by wsHub.handleConnect to bootstrap late joiners.
|
|
type ScreenCache struct {
|
|
Width int
|
|
Height int
|
|
Keyframe []byte // packed WS packet; nil if no keyframe cached yet
|
|
Active bool // true iff a screen sub-conn is currently bound
|
|
}
|
|
|
|
// ScreenState returns a snapshot of the device's current screen state, or
|
|
// an empty struct if the device is unknown. Safe to call from any goroutine.
|
|
func (h *Hub) ScreenState(deviceID string) ScreenCache {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
d, ok := h.devices[deviceID]
|
|
if !ok {
|
|
return ScreenCache{}
|
|
}
|
|
return ScreenCache{
|
|
Width: d.screenWidth,
|
|
Height: d.screenHeight,
|
|
Keyframe: d.lastKeyframe,
|
|
Active: d.screenConn != nil,
|
|
}
|
|
}
|
|
|
|
// MainConn exposes the device's main TCP context for callers that need to
|
|
// send commands directly. Returns nil if the device is not registered.
|
|
func (h *Hub) MainConn(id string) *connection.Context {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
if d, ok := h.devices[id]; ok {
|
|
return d.conn
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Capability returns the device's reported capability hex string
|
|
// (LOGIN_INFOR.moduleVersion tail). Empty for unknown devices — callers
|
|
// should treat that as "no caps" (legacy Windows GBK default).
|
|
func (h *Hub) Capability(id string) string {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
if d, ok := h.devices[id]; ok {
|
|
return d.Capability
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// DeviceInfo is the JSON-safe projection of Device for the /api/devices
|
|
// endpoint and the WS device_list message. Field names match what the
|
|
// existing browser front-end expects.
|
|
type DeviceInfo struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Group string `json:"group,omitempty"`
|
|
Version string `json:"version"`
|
|
Capability string `json:"capability,omitempty"`
|
|
OS string `json:"os"`
|
|
CPU string `json:"cpu,omitempty"`
|
|
FilePath string `json:"file_path,omitempty"`
|
|
InstallTime string `json:"install_time,omitempty"`
|
|
Location string `json:"location,omitempty"`
|
|
IP string `json:"ip"` // client-reported public IP (matches C++ key)
|
|
PeerIP string `json:"peer_ip,omitempty"`
|
|
Screen string `json:"screen,omitempty"` // "N:W*H" — matches C++ DeviceInfo.screen key
|
|
RTT int `json:"rtt"`
|
|
ActiveWindow string `json:"activeWindow,omitempty"`
|
|
ConnectedAt int64 `json:"connected_at"`
|
|
Online bool `json:"online"`
|
|
}
|
|
|
|
// EventHandler receives notifications about device lifecycle, per-tick live
|
|
// updates, screen frames and resolution changes. Methods are invoked
|
|
// synchronously from the corresponding hub mutator — implementations must
|
|
// be non-blocking (typically just write to a channel or queue).
|
|
type EventHandler interface {
|
|
OnDeviceOnline(d DeviceInfo)
|
|
OnDeviceOffline(id string)
|
|
OnDeviceUpdate(id string, rtt int, activeWindow string)
|
|
// OnScreenFrame delivers a fully-formed WS binary packet for the given
|
|
// device. The packet matches the C++ layout:
|
|
// [DeviceID:4 LE][FrameType:1][DataLen:4 LE][H264:N]
|
|
// Implementations should treat the slice as read-only.
|
|
OnScreenFrame(deviceID string, packet []byte, isKeyframe bool)
|
|
// OnResolutionChange fires when a screen session starts (TOKEN_BITMAPINFO)
|
|
// or whenever the device reports a new screen geometry mid-stream.
|
|
OnResolutionChange(deviceID string, width, height int)
|
|
// OnCursorChange fires when the device's foreground cursor index changes.
|
|
// Duplicates (same index as the previous frame) are filtered out by the
|
|
// hub before reaching subscribers.
|
|
OnCursorChange(deviceID string, index byte)
|
|
// OnTerminalReady fires once the device's shell sub-conn is bound and
|
|
// the server has sent COMMAND_NEXT to start its output read loop.
|
|
// isPTY=true means PTY mode (Linux/macOS or ConPTY); false means the
|
|
// legacy Windows cmd-pipe path which doesn't support resize.
|
|
OnTerminalReady(deviceID string, isPTY bool)
|
|
// OnTerminalData ships one chunk of raw shell output (already wrapped
|
|
// in the WS-binary "TRM1" magic header) to terminal viewers.
|
|
OnTerminalData(deviceID string, packet []byte)
|
|
// OnTerminalClosed fires when the session ends — either because the
|
|
// device sent TOKEN_TERMINAL_CLOSE, the sub-conn dropped, or the
|
|
// server explicitly tore it down.
|
|
OnTerminalClosed(deviceID string, reason string)
|
|
}
|
|
|
|
// Hub is a thread-safe registry of online devices.
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
devices map[string]*Device
|
|
subMu sync.RWMutex
|
|
subscribers []EventHandler
|
|
|
|
sender SendFunc
|
|
|
|
// Reverse index: TCP context -> device ID for the device's screen
|
|
// sub-connection. Lets us clean up on raw-connection close without
|
|
// having to walk every device. Empty when no screen sessions exist.
|
|
screenIndex map[*connection.Context]string
|
|
screenIndexMu sync.RWMutex
|
|
|
|
// Parallel reverse index for terminal sub-conns. Same purpose: O(1)
|
|
// lookup from a raw ctx (e.g. on OnDisconnect) back to its device.
|
|
terminalIndex map[*connection.Context]string
|
|
terminalIndexMu sync.RWMutex
|
|
}
|
|
|
|
// New returns an empty Hub.
|
|
func New() *Hub {
|
|
return &Hub{
|
|
devices: make(map[string]*Device),
|
|
screenIndex: make(map[*connection.Context]string),
|
|
terminalIndex: make(map[*connection.Context]string),
|
|
}
|
|
}
|
|
|
|
// SetSender wires the function used to deliver outbound bytes on a device's
|
|
// main TCP connection. Typically called once in main() with server.Send.
|
|
func (h *Hub) SetSender(fn SendFunc) {
|
|
h.sender = fn
|
|
}
|
|
|
|
// SendToDevice forwards an already-formed command payload to the device's
|
|
// main connection. data should be the raw command bytes (the sender takes
|
|
// care of framing / compression at the protocol layer).
|
|
func (h *Hub) SendToDevice(id string, data []byte) error {
|
|
h.mu.RLock()
|
|
d, ok := h.devices[id]
|
|
h.mu.RUnlock()
|
|
if !ok || d.conn == nil {
|
|
return ErrDeviceOffline
|
|
}
|
|
if h.sender == nil {
|
|
return ErrNoSender
|
|
}
|
|
return h.sender(d.conn, data)
|
|
}
|
|
|
|
// SendToScreen routes a payload to the device's currently-bound screen
|
|
// sub-connection. Input events (COMMAND_SCREEN_CONTROL) MUST go through the
|
|
// screen sub-conn rather than the main conn — the C++ client only dispatches
|
|
// these commands from CScreenManager::OnReceive, which reads exclusively from
|
|
// the sub-conn (see client/ScreenManager.cpp:1065). Returns ErrDeviceOffline
|
|
// when the device is unknown OR has no active screen session, so callers can
|
|
// quietly drop input from browsers that haven't called connect yet.
|
|
func (h *Hub) SendToScreen(id string, data []byte) error {
|
|
h.mu.RLock()
|
|
d, ok := h.devices[id]
|
|
var sc *connection.Context
|
|
if ok {
|
|
sc = d.screenConn
|
|
}
|
|
h.mu.RUnlock()
|
|
if !ok || sc == nil {
|
|
return ErrDeviceOffline
|
|
}
|
|
if h.sender == nil {
|
|
return ErrNoSender
|
|
}
|
|
return h.sender(sc, data)
|
|
}
|
|
|
|
// BindScreenConn associates a freshly-arrived sub-connection (the one that
|
|
// just sent TOKEN_BITMAPINFO) with the device identified by clientID.
|
|
// Returns false if the device is not registered — callers should drop the
|
|
// orphan connection in that case.
|
|
//
|
|
// If the device already has a screen sub-conn bound (typically because the
|
|
// client's monitor-poll logic opened a new one without the previous viewer
|
|
// going through CloseScreen — e.g. multiple open/close cycles where each
|
|
// cycle picks a different display), the previous sub-conn is retired via
|
|
// retireScreenConn. Without this, both sub-conns keep streaming under the
|
|
// same device ID and the browser sees H.264 frames from two encoders
|
|
// interleaved, rendering as a picture that jumps between monitors.
|
|
func (h *Hub) BindScreenConn(deviceID string, ctx *connection.Context) bool {
|
|
if deviceID == "" || ctx == nil {
|
|
return false
|
|
}
|
|
h.mu.Lock()
|
|
d, ok := h.devices[deviceID]
|
|
if !ok {
|
|
h.mu.Unlock()
|
|
return false
|
|
}
|
|
old := d.screenConn
|
|
d.screenConn = ctx
|
|
// The cached resolution and last IDR belong to the old encoder's
|
|
// stream. A viewer joining now must wait for the new sub-conn's
|
|
// TOKEN_BITMAPINFO + first IDR; serving the old monitor's keyframe
|
|
// to a decoder that's about to receive a different SPS/PPS would
|
|
// produce the same mixed-stream corruption retireScreenConn exists
|
|
// to prevent.
|
|
replacing := old != nil && old != ctx
|
|
if replacing {
|
|
d.screenWidth = 0
|
|
d.screenHeight = 0
|
|
d.lastKeyframe = nil
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
h.screenIndexMu.Lock()
|
|
h.screenIndex[ctx] = deviceID
|
|
h.screenIndexMu.Unlock()
|
|
|
|
if replacing {
|
|
h.retireScreenConn(old)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ScreenDeviceID returns the device ID whose screen sub-connection this
|
|
// context represents, or "" if the context is not a screen sub-connection.
|
|
// Used by the TCP layer to route TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames.
|
|
func (h *Hub) ScreenDeviceID(ctx *connection.Context) string {
|
|
h.screenIndexMu.RLock()
|
|
defer h.screenIndexMu.RUnlock()
|
|
return h.screenIndex[ctx]
|
|
}
|
|
|
|
// UnbindScreenConn removes the screen sub-connection mapping (called on TCP
|
|
// disconnect of a screen sub-context). No-op if the context isn't tracked.
|
|
func (h *Hub) UnbindScreenConn(ctx *connection.Context) {
|
|
h.screenIndexMu.Lock()
|
|
deviceID, ok := h.screenIndex[ctx]
|
|
if !ok {
|
|
h.screenIndexMu.Unlock()
|
|
return
|
|
}
|
|
delete(h.screenIndex, ctx)
|
|
h.screenIndexMu.Unlock()
|
|
|
|
h.mu.Lock()
|
|
if d, ok := h.devices[deviceID]; ok && d.screenConn == ctx {
|
|
d.screenConn = nil
|
|
// Clear the cache too — when this device's screen comes back up, the
|
|
// resolution and IDR will be republished fresh.
|
|
d.screenWidth = 0
|
|
d.screenHeight = 0
|
|
d.lastKeyframe = nil
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
// retireScreenConn tears down a screen sub-conn that's being replaced or
|
|
// closed. Shared by CloseScreen (last viewer left) and BindScreenConn (device
|
|
// opened a new sub-conn that's superseding this one, e.g. when the client's
|
|
// monitor-poll logic switches to a different display).
|
|
//
|
|
// Steps and ordering matter:
|
|
//
|
|
// 1. screenIndex entry is dropped FIRST so any in-flight frames still in the
|
|
// device's TCP send buffer arrive at handleScreenFrame with deviceID=""
|
|
// and are silently dropped. Without this they'd be relayed under the
|
|
// same deviceID as the new sub-conn — that's the "frames from two
|
|
// monitors interleaved in one stream" symptom: the browser decoder sees
|
|
// a mix of two independent x264 SPS/PPS sequences and renders an
|
|
// alternating / glitchy picture.
|
|
//
|
|
// 2. COMMAND_BYE is sent so the C++ client's IOCPClient exits via the
|
|
// clean StopRunning() path. Without it the client treats FIN as a
|
|
// network blip and fires m_ReconnectFunc, which opens a fresh sub-conn
|
|
// that never sends BITMAPINFO again, keeps the encoder thread alive,
|
|
// and ultimately wedges the device for ~10 s — see the original
|
|
// CloseScreen comment for the full failure mode.
|
|
//
|
|
// 3. The actual TCP Close happens 500 ms later on a goroutine, mirroring
|
|
// the C++ ScreenSpyDlg.cpp:842 (Sleep(500); CancelIO()) sequence so
|
|
// COMMAND_BYE has time to reach the device read loop before FIN does.
|
|
//
|
|
// No-op for a nil sc so callers can pass d.screenConn unconditionally.
|
|
func (h *Hub) retireScreenConn(sc *connection.Context) {
|
|
if sc == nil {
|
|
return
|
|
}
|
|
h.screenIndexMu.Lock()
|
|
delete(h.screenIndex, sc)
|
|
h.screenIndexMu.Unlock()
|
|
if h.sender != nil {
|
|
_ = h.sender(sc, []byte{protocol.CommandBye})
|
|
}
|
|
go func(c *connection.Context) {
|
|
time.Sleep(500 * time.Millisecond)
|
|
c.Close()
|
|
}(sc)
|
|
}
|
|
|
|
// Subscribe registers an EventHandler. The returned func removes it.
|
|
// Multiple handlers are supported; each receives every event.
|
|
func (h *Hub) Subscribe(eh EventHandler) (unsubscribe func()) {
|
|
h.subMu.Lock()
|
|
h.subscribers = append(h.subscribers, eh)
|
|
h.subMu.Unlock()
|
|
return func() {
|
|
h.subMu.Lock()
|
|
defer h.subMu.Unlock()
|
|
for i, x := range h.subscribers {
|
|
if x == eh {
|
|
h.subscribers = append(h.subscribers[:i], h.subscribers[i+1:]...)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Hub) snapshotSubscribers() []EventHandler {
|
|
h.subMu.RLock()
|
|
defer h.subMu.RUnlock()
|
|
out := make([]EventHandler, len(h.subscribers))
|
|
copy(out, h.subscribers)
|
|
return out
|
|
}
|
|
|
|
// Register records a device as online and pins the main TCP connection that
|
|
// will receive outbound commands via SendToDevice. Re-registering an existing
|
|
// ID overwrites the previous entry (e.g. a client reconnect with the same
|
|
// MasterID). A nil device, nil conn, or empty ID is silently ignored.
|
|
// Subscribers are notified after the device is added.
|
|
func (h *Hub) Register(d *Device, conn *connection.Context) {
|
|
if d == nil || d.ID == "" || conn == nil {
|
|
return
|
|
}
|
|
d.conn = conn
|
|
h.mu.Lock()
|
|
h.devices[d.ID] = d
|
|
info := deviceToInfo(d)
|
|
h.mu.Unlock()
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnDeviceOnline(info)
|
|
}
|
|
}
|
|
|
|
// Unregister removes a device by ID. No-op if not present.
|
|
// Subscribers are notified after the device is removed (only if it existed).
|
|
func (h *Hub) Unregister(id string) {
|
|
if id == "" {
|
|
return
|
|
}
|
|
h.mu.Lock()
|
|
_, existed := h.devices[id]
|
|
delete(h.devices, id)
|
|
h.mu.Unlock()
|
|
if !existed {
|
|
return
|
|
}
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnDeviceOffline(id)
|
|
}
|
|
}
|
|
|
|
// ListDevices returns a fresh snapshot slice. The caller may mutate it freely;
|
|
// it shares no state with the hub.
|
|
//
|
|
// Sort by ConnectedAt asc (ID as tiebreaker) so the order stays stable across
|
|
// REST polls and WS pushes — Go's map iteration is intentionally randomized,
|
|
// which would otherwise reshuffle the UI list on every refresh.
|
|
func (h *Hub) ListDevices() []DeviceInfo {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
out := make([]DeviceInfo, 0, len(h.devices))
|
|
for _, d := range h.devices {
|
|
out = append(out, deviceToInfo(d))
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
if out[i].ConnectedAt != out[j].ConnectedAt {
|
|
return out[i].ConnectedAt < out[j].ConnectedAt
|
|
}
|
|
return out[i].ID < out[j].ID
|
|
})
|
|
return out
|
|
}
|
|
|
|
func deviceToInfo(d *Device) DeviceInfo {
|
|
return DeviceInfo{
|
|
ID: d.ID,
|
|
Name: d.Name,
|
|
Group: d.Group,
|
|
Version: d.Version,
|
|
Capability: d.Capability,
|
|
OS: d.OS,
|
|
CPU: d.CPU,
|
|
FilePath: d.FilePath,
|
|
InstallTime: d.InstallTime,
|
|
Location: d.Location,
|
|
IP: d.PublicIP,
|
|
PeerIP: d.PeerIP,
|
|
Screen: d.Resolution,
|
|
RTT: d.RTT,
|
|
ActiveWindow: d.ActiveWindow,
|
|
ConnectedAt: d.ConnectedAt.Unix(),
|
|
Online: true, // a device that's in the map is by definition online
|
|
}
|
|
}
|
|
|
|
// PublishCursor notifies subscribers when the device reports a new cursor
|
|
// index. Repeated identical indices are suppressed so the WS isn't spammed
|
|
// with per-frame cursor JSON. No-op for unknown devices.
|
|
func (h *Hub) PublishCursor(deviceID string, index byte) {
|
|
h.mu.Lock()
|
|
d, ok := h.devices[deviceID]
|
|
if !ok {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
if d.cursorSeen && d.lastCursorIndex == index {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
d.cursorSeen = true
|
|
d.lastCursorIndex = index
|
|
h.mu.Unlock()
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnCursorChange(deviceID, index)
|
|
}
|
|
}
|
|
|
|
// CloseScreen tears down the active screen sub-connection for the device,
|
|
// if any. Used when the last viewer leaves so the device stops capturing.
|
|
//
|
|
// Cache (screenConn / screenWidth / lastKeyframe) is cleared SYNCHRONOUSLY
|
|
// here, not deferred to the eventual OnDisconnect → UnbindScreenConn path.
|
|
// Otherwise a new viewer arriving in the brief window between TCP close and
|
|
// the disconnect callback would see Active=true with stale dimensions/IDR
|
|
// and skip the COMMAND_SCREEN_SPY kick, leaving the page stuck on a "connected"
|
|
// status with no frames ever arriving.
|
|
//
|
|
// The actual sub-conn teardown is delegated to retireScreenConn, which is
|
|
// shared with BindScreenConn's replacement path.
|
|
func (h *Hub) CloseScreen(deviceID string) {
|
|
h.mu.Lock()
|
|
d, ok := h.devices[deviceID]
|
|
if !ok {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
sc := d.screenConn
|
|
d.screenConn = nil
|
|
d.screenWidth = 0
|
|
d.screenHeight = 0
|
|
d.lastKeyframe = nil
|
|
h.mu.Unlock()
|
|
h.retireScreenConn(sc)
|
|
}
|
|
|
|
// PublishResolution announces a new (or first-ever) screen geometry for a
|
|
// device. The browser uses width/height to initialize its WebCodecs decoder.
|
|
// The latest dimensions are also cached on the Device so future late-joining
|
|
// viewers can be bootstrapped without waiting for the next BITMAPINFO.
|
|
func (h *Hub) PublishResolution(deviceID string, width, height int) {
|
|
h.mu.Lock()
|
|
if d, ok := h.devices[deviceID]; ok {
|
|
d.screenWidth = width
|
|
d.screenHeight = height
|
|
}
|
|
h.mu.Unlock()
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnResolutionChange(deviceID, width, height)
|
|
}
|
|
}
|
|
|
|
// PublishScreenFrame fans out a screen frame packet to all subscribers.
|
|
// Callers must have already wrapped the H.264 NAL payload in the
|
|
// [DeviceID:4][FrameType:1][DataLen:4][...] header expected by the browser.
|
|
// The packet slice is shared with subscribers — do not mutate after publish.
|
|
//
|
|
// Keyframe packets are also retained on the Device record so a new viewer
|
|
// joining a live session can immediately receive a decodable starting point
|
|
// instead of waiting up to ~15 s for the next IDR.
|
|
func (h *Hub) PublishScreenFrame(deviceID string, packet []byte, isKeyframe bool) {
|
|
if isKeyframe {
|
|
h.mu.Lock()
|
|
if d, ok := h.devices[deviceID]; ok {
|
|
d.lastKeyframe = packet
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnScreenFrame(deviceID, packet, isKeyframe)
|
|
}
|
|
}
|
|
|
|
// UpdateLive applies a heartbeat-derived RTT and active-window title to the
|
|
// device's live fields, then notifies subscribers. No-op if the device is
|
|
// not registered (e.g. heartbeat arriving for a connection that never sent
|
|
// TOKEN_LOGIN or has already disconnected).
|
|
func (h *Hub) UpdateLive(id string, rtt int, activeWindow string) {
|
|
if id == "" {
|
|
return
|
|
}
|
|
h.mu.Lock()
|
|
d, ok := h.devices[id]
|
|
if !ok {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
d.RTT = rtt
|
|
d.ActiveWindow = activeWindow
|
|
h.mu.Unlock()
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnDeviceUpdate(id, rtt, activeWindow)
|
|
}
|
|
}
|
|
|
|
// ----- Terminal session management (Phase 6) --------------------------------
|
|
|
|
// ErrTerminalBusy is returned by OpenTerminalSession when the device already
|
|
// has a pending or active terminal session — MVP enforces single-viewer.
|
|
var ErrTerminalBusy = errors.New("terminal already open by another viewer")
|
|
|
|
// OpenTerminalSession atomically marks a terminal session as pending for the
|
|
// device, then sends COMMAND_SHELL on the main TCP connection so the device
|
|
// will spawn a shell sub-conn. Returns nil if the request was sent. On any
|
|
// failure the pending flag is rolled back so retries are possible.
|
|
//
|
|
// Single-viewer constraint: if a pending or bound session already exists,
|
|
// returns ErrTerminalBusy. Mirrors C++ CWebService::HandleTermOpen
|
|
// (server/2015Remote/WebService.cpp:1838).
|
|
func (h *Hub) OpenTerminalSession(deviceID string) error {
|
|
if deviceID == "" {
|
|
return ErrDeviceOffline
|
|
}
|
|
h.mu.Lock()
|
|
d, ok := h.devices[deviceID]
|
|
if !ok || d.conn == nil {
|
|
h.mu.Unlock()
|
|
return ErrDeviceOffline
|
|
}
|
|
if d.terminalPending || d.terminalConn != nil {
|
|
h.mu.Unlock()
|
|
return ErrTerminalBusy
|
|
}
|
|
d.terminalPending = true
|
|
mainConn := d.conn
|
|
h.mu.Unlock()
|
|
|
|
if h.sender == nil {
|
|
// Roll back so a retry isn't permanently blocked.
|
|
h.mu.Lock()
|
|
d.terminalPending = false
|
|
h.mu.Unlock()
|
|
return ErrNoSender
|
|
}
|
|
if err := h.sender(mainConn, []byte{protocol.CommandShell}); err != nil {
|
|
h.mu.Lock()
|
|
d.terminalPending = false
|
|
h.mu.Unlock()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsTerminalPending tells the TCP layer whether the next-arriving shell
|
|
// sub-conn should be claimed by the web terminal. The C++ side uses this
|
|
// in MessageHandle to decide between WebService takeover and opening an
|
|
// MFC dialog (server/2015Remote/2015RemoteDlg.cpp:5753).
|
|
func (h *Hub) IsTerminalPending(deviceID string) bool {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
d, ok := h.devices[deviceID]
|
|
return ok && d.terminalPending
|
|
}
|
|
|
|
// BindTerminalConn promotes the pending session to an active one by
|
|
// associating the device's freshly-arrived shell sub-conn. Returns false
|
|
// if no pending session exists — callers should drop the orphan ctx.
|
|
//
|
|
// Subscribers receive OnTerminalReady AFTER binding so they can flip the
|
|
// browser into "ready" state immediately on the same TCP roundtrip that
|
|
// will deliver the first shell output.
|
|
func (h *Hub) BindTerminalConn(deviceID string, ctx *connection.Context, isPTY bool) bool {
|
|
if deviceID == "" || ctx == nil {
|
|
return false
|
|
}
|
|
h.mu.Lock()
|
|
d, ok := h.devices[deviceID]
|
|
if !ok || !d.terminalPending {
|
|
h.mu.Unlock()
|
|
return false
|
|
}
|
|
d.terminalConn = ctx
|
|
d.terminalIsPTY = isPTY
|
|
d.terminalPending = false
|
|
h.mu.Unlock()
|
|
|
|
h.terminalIndexMu.Lock()
|
|
h.terminalIndex[ctx] = deviceID
|
|
h.terminalIndexMu.Unlock()
|
|
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnTerminalReady(deviceID, isPTY)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// TerminalDeviceID returns the device ID whose terminal sub-conn this
|
|
// context belongs to, or "" otherwise. The TCP layer uses this on every
|
|
// inbound packet on a sub-conn — when non-empty, the bytes are raw shell
|
|
// output and bypass the usual command-byte switch.
|
|
func (h *Hub) TerminalDeviceID(ctx *connection.Context) string {
|
|
h.terminalIndexMu.RLock()
|
|
defer h.terminalIndexMu.RUnlock()
|
|
return h.terminalIndex[ctx]
|
|
}
|
|
|
|
// UnbindTerminalConn removes the terminal mapping (called from the TCP
|
|
// disconnect path for any sub-conn ctx). Fires OnTerminalClosed once if
|
|
// the unbind actually removed something — so subscribers can update the
|
|
// browser even on unexpected device-side drops.
|
|
func (h *Hub) UnbindTerminalConn(ctx *connection.Context) {
|
|
h.terminalIndexMu.Lock()
|
|
deviceID, tracked := h.terminalIndex[ctx]
|
|
if !tracked {
|
|
h.terminalIndexMu.Unlock()
|
|
return
|
|
}
|
|
delete(h.terminalIndex, ctx)
|
|
h.terminalIndexMu.Unlock()
|
|
|
|
h.mu.Lock()
|
|
if d, ok := h.devices[deviceID]; ok && d.terminalConn == ctx {
|
|
d.terminalConn = nil
|
|
d.terminalPending = false
|
|
d.terminalIsPTY = false
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnTerminalClosed(deviceID, "disconnected")
|
|
}
|
|
}
|
|
|
|
// SendToTerminal forwards bytes (typically xterm.js keystrokes) to the
|
|
// device's shell sub-conn. Returns ErrDeviceOffline if no session is
|
|
// active for this device.
|
|
func (h *Hub) SendToTerminal(id string, data []byte) error {
|
|
h.mu.RLock()
|
|
d, ok := h.devices[id]
|
|
var tc *connection.Context
|
|
if ok {
|
|
tc = d.terminalConn
|
|
}
|
|
h.mu.RUnlock()
|
|
if !ok || tc == nil {
|
|
return ErrDeviceOffline
|
|
}
|
|
if h.sender == nil {
|
|
return ErrNoSender
|
|
}
|
|
return h.sender(tc, data)
|
|
}
|
|
|
|
// TerminalIsPTY reports whether the active session is PTY mode (the
|
|
// resize command only applies in PTY mode — legacy cmd-pipe ignores it).
|
|
func (h *Hub) TerminalIsPTY(id string) bool {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
d, ok := h.devices[id]
|
|
return ok && d.terminalConn != nil && d.terminalIsPTY
|
|
}
|
|
|
|
// CloseTerminalSession tears down the session from the server side
|
|
// (typically when the requesting browser sends term_close or disconnects).
|
|
// Mirrors CloseScreen's graceful pattern: drop the index synchronously,
|
|
// send COMMAND_BYE, then close after a short grace period so the client's
|
|
// IOCPClient reconnect logic doesn't fire.
|
|
func (h *Hub) CloseTerminalSession(deviceID string) {
|
|
h.mu.Lock()
|
|
d, ok := h.devices[deviceID]
|
|
if !ok {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
tc := d.terminalConn
|
|
// hadSession guards against firing spurious OnTerminalClosed events
|
|
// when there was nothing to tear down — relevant when the main-conn
|
|
// teardown path calls CloseTerminalSession unconditionally as part of
|
|
// device-offline cleanup, or when both OnDisconnect and an explicit
|
|
// browser term_close race for the same teardown.
|
|
hadSession := tc != nil || d.terminalPending
|
|
d.terminalConn = nil
|
|
d.terminalPending = false
|
|
d.terminalIsPTY = false
|
|
h.mu.Unlock()
|
|
|
|
if !hadSession {
|
|
return
|
|
}
|
|
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnTerminalClosed(deviceID, "closed")
|
|
}
|
|
|
|
if tc == nil {
|
|
return
|
|
}
|
|
h.terminalIndexMu.Lock()
|
|
delete(h.terminalIndex, tc)
|
|
h.terminalIndexMu.Unlock()
|
|
|
|
// Mirror Hub.CloseScreen: send COMMAND_BYE then close after 500 ms so
|
|
// the device exits its shell read loop instead of treating the FIN as
|
|
// a network blip and triggering reconnect.
|
|
if h.sender != nil {
|
|
_ = h.sender(tc, []byte{protocol.CommandBye})
|
|
}
|
|
go func(c *connection.Context) {
|
|
time.Sleep(500 * time.Millisecond)
|
|
c.Close()
|
|
}(tc)
|
|
}
|
|
|
|
// PublishTerminalData fans out one chunk of shell output to subscribers.
|
|
// Caller has already wrapped it in the "TRM1" magic header so the browser
|
|
// can demultiplex from screen frames over the shared WebSocket.
|
|
func (h *Hub) PublishTerminalData(deviceID string, packet []byte) {
|
|
for _, s := range h.snapshotSubscribers() {
|
|
s.OnTerminalData(deviceID, packet)
|
|
}
|
|
}
|
|
|
|
// Count returns the current number of online devices.
|
|
func (h *Hub) Count() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.devices)
|
|
}
|